This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a5667e82e25 [FLINK-27919] Add FLIP-238 data generator source
a5667e82e25 is described below

commit a5667e82e25cb87dc5523b82b08aec3e1408e9c6
Author: Alexander Fedulov <[email protected]>
AuthorDate: Wed Nov 23 16:33:08 2022 +0100

    [FLINK-27919] Add FLIP-238 data generator source
---
 .../docs/connectors/datastream/datagen.md          | 104 +++++++++
 .../docs/connectors/datastream/overview.md         |   1 +
 docs/content/docs/connectors/datastream/datagen.md | 104 +++++++++
 .../content/docs/connectors/datastream/overview.md |   1 +
 .../18509c9e-3250-4c52-91b9-11ccefc85db1           |   2 +-
 .../0fbe3123-5829-4891-93a5-a99bd8413fd9           |   0
 .../5a661a23-5b47-407c-9994-b6215a46c45c           |   0
 .../archunit-violations/stored.rules               |   4 +
 flink-connectors/flink-connector-datagen/pom.xml   |  78 +++++++
 .../datagen/source/DataGeneratorSource.java        | 195 ++++++++++++++++
 .../source/GeneratingIteratorSourceReader.java     |  78 +++++++
 .../datagen/source/GeneratorFunction.java          |  55 +++++
 .../source/GeneratorSourceReaderFactory.java       |  67 ++++++
 .../src/main/resources/log4j2.properties           |  25 +++
 .../architecture/TestCodeArchitectureTest.java     |  40 ++++
 .../datagen/source/DataGeneratorSourceITCase.java  | 250 +++++++++++++++++++++
 .../datagen/source/DataGeneratorSourceTest.java    |  91 +++++---
 .../src/test/resources/archunit.properties         |  31 +++
 .../src/test/resources/log4j2-test.properties      |  28 +++
 .../file/src/FileSourceHeavyThroughputTest.java    |   5 +
 flink-connectors/pom.xml                           |   1 +
 .../apache/flink/api/connector/source/Source.java  |  16 +-
 .../api/connector/source/SourceReaderContext.java  |   9 +
 .../api/connector/source/SourceReaderFactory.java  |  42 ++++
 .../source/lib/util/IteratorSourceReader.java      | 134 +----------
 ...ceReader.java => IteratorSourceReaderBase.java} |  24 +-
 .../source/util/ratelimit/GatedRateLimiter.java    |  68 ++++++
 .../source/util/ratelimit/GuavaRateLimiter.java    |  48 ++++
 .../source/util/ratelimit/NoOpRateLimiter.java     |  34 +++
 .../util/ratelimit/RateLimitedSourceReader.java    | 110 +++++++++
 .../source/util/ratelimit/RateLimiter.java         |  47 ++++
 .../source/util/ratelimit/RateLimiterStrategy.java |  73 ++++++
 .../source/lib/NumberSequenceSourceTest.java       |   5 +
 flink-examples/flink-examples-streaming/pom.xml    |   6 +
 .../streaming/examples/datagen/DataGenerator.java  |  51 +++++
 .../datagen/DataGeneratorPerCheckpoint.java        |  56 +++++
 .../source/datagen/DataGeneratorSource.java        |   3 +
 .../streaming/api/operators/SourceOperator.java    |   5 +
 .../source/reader/TestingReaderContext.java        |   5 +
 flink-tests/pom.xml                                |   6 +
 .../source/lib/util/GatedRateLimiterTest.java      |  53 +++++
 .../lib/util/RateLimitedSourceReaderITCase.java    | 119 ++++++++++
 pom.xml                                            |   1 +
 43 files changed, 1893 insertions(+), 182 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/datagen.md 
b/docs/content.zh/docs/connectors/datastream/datagen.md
new file mode 100644
index 00000000000..af3129357ed
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/datagen.md
@@ -0,0 +1,104 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-----
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data
+generation process by supplying "index" values of type `Long` to the 
user-provided
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of `Long` 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + 
index;
+long numberOfRecords = 1000;
+
+DataGeneratorSource<String> source =
+        new DataGeneratorSource<>(generatorFunction, numberOfRecords, 
Types.STRING);
+
+DataStreamSource<String> stream =
+        env.fromSource(source,
+        WatermarkStrategy.noWatermarks(),
+        "Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+Rate Limiting
+-----
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce a stream of
+`Long` values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction<Long, Long> generatorFunction = index -> index;
+double recordsPerSecond = 100;
+
+DataGeneratorSource<String> source =
+        new DataGeneratorSource<>(
+             generatorFunction,
+             Long.MAX_VALUE,
+             RateLimiterStrategy.perSecond(recordsPerSecond),
+             Types.STRING);
+```
+
+Additional rate limiting strategies, such as limiting the number of records 
emitted per checkpoint, can
+be found in {{< javadoc name="RateLimiterStrategy" 
file="org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.html">}}.
+
+Boundedness
+-----
+This source is always bounded. From a practical perspective, however, setting 
the number of records
+to `Long.MAX_VALUE` turns it into an effectively unbounded source (the end 
will never be reached). For finite sequences users may want to consider running 
the application in [`BATCH` execution mode]({{< ref 
"docs/dev/datastream/execution_mode" 
>}}#when-canshould-i-use-batch-execution-mode)
+.
+
+Notes
+-----
+
+{{< hint info >}}
+**Note:**  `DataGeneratorSource` can be used to implement Flink jobs with 
at-least-once and
+end-to-end exactly-once processing guarantees under the condition that the 
output of the `GeneratorFunction`
+is deterministic with respect to its input, in other words supplying the same 
`Long` number always
+leads to generating the same output.
+{{< /hint >}}
+
+{{< hint info >}}
+**Note:**  it is possible to also produce deterministic watermarks right at the
+source based on the generated events and a custom {{< javadoc 
name="WatermarkStrategy" 
file="org/apache/flink/api/common/eventtime/WatermarkStrategy.html">}}.  
+{{< /hint >}}
+
+
+
diff --git a/docs/content.zh/docs/connectors/datastream/overview.md 
b/docs/content.zh/docs/connectors/datastream/overview.md
index d394302cf56..9c6ffe0af38 100644
--- a/docs/content.zh/docs/connectors/datastream/overview.md
+++ b/docs/content.zh/docs/connectors/datastream/overview.md
@@ -41,6 +41,7 @@ under the License.
  * [Apache Cassandra]({{< ref "docs/connectors/datastream/cassandra" >}}) 
(sink)
  * [Amazon Kinesis Data Streams]({{< ref "docs/connectors/datastream/kinesis" 
>}}) (source/sink)
  * [Amazon Kinesis Data Firehose]({{< ref 
"docs/connectors/datastream/firehose" >}}) (sink)
+ * [DataGen]({{< ref "docs/connectors/datastream/datagen" >}}) (source)
  * [Elasticsearch]({{< ref "docs/connectors/datastream/elasticsearch" >}}) 
(sink)
  * [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) (sink)
  * [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink)
diff --git a/docs/content/docs/connectors/datastream/datagen.md 
b/docs/content/docs/connectors/datastream/datagen.md
new file mode 100644
index 00000000000..3847982550e
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/datagen.md
@@ -0,0 +1,104 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-----
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type `Long` to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of `Long` 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 1", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + 
index;
+long numberOfRecords = 1000;
+
+DataGeneratorSource<String> source =
+        new DataGeneratorSource<>(generatorFunction, numberOfRecords, 
Types.STRING);
+
+DataStreamSource<String> stream =
+        env.fromSource(source,
+        WatermarkStrategy.noWatermarks(),
+        "Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+Rate Limiting
+-----
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce a stream of
+`Long` values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction<Long, Long> generatorFunction = index -> index;
+double recordsPerSecond = 100;
+
+DataGeneratorSource<String> source =
+        new DataGeneratorSource<>(
+             generatorFunction,
+             Long.MAX_VALUE,
+             RateLimiterStrategy.perSecond(recordsPerSecond),
+             Types.STRING);
+```
+
+Additional rate limiting strategies, such as limiting the number of records 
emitted per checkpoint, can 
+be found in {{< javadoc name="RateLimiterStrategy" 
file="org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.html">}}.
 
+
+Boundedness
+-----
+This source is always bounded. From a practical perspective, however, setting 
the number of records
+to `Long.MAX_VALUE` turns it into an effectively unbounded source (the end 
will never be reached). For finite sequences users may want to consider running 
the application in [`BATCH` execution mode]({{< ref 
"docs/dev/datastream/execution_mode" 
>}}#when-canshould-i-use-batch-execution-mode)
+.
+
+Notes
+-----
+
+{{< hint info >}}
+**Note:**  `DataGeneratorSource` can be used to implement Flink jobs with 
at-least-once and
+end-to-end exactly-once processing guarantees under the condition that the 
output of the `GeneratorFunction`
+is deterministic with respect to its input, in other words supplying the same 
`Long` number always
+leads to generating the same output.
+{{< /hint >}}
+
+{{< hint info >}}
+**Note:**  it is possible to also produce deterministic watermarks right at the
+source based on the generated events and a custom {{< javadoc 
name="WatermarkStrategy" 
file="org/apache/flink/api/common/eventtime/WatermarkStrategy.html">}}.  
+{{< /hint >}}
+
+
+
diff --git a/docs/content/docs/connectors/datastream/overview.md 
b/docs/content/docs/connectors/datastream/overview.md
index 8c6707b6095..65e61abd0f7 100644
--- a/docs/content/docs/connectors/datastream/overview.md
+++ b/docs/content/docs/connectors/datastream/overview.md
@@ -42,6 +42,7 @@ Connectors provide code for interfacing with various 
third-party systems. Curren
  * [Apache Cassandra]({{< ref "docs/connectors/datastream/cassandra" >}}) 
(sink)
  * [Amazon Kinesis Data Streams]({{< ref "docs/connectors/datastream/kinesis" 
>}}) (source/sink)
  * [Amazon Kinesis Data Firehose]({{< ref 
"docs/connectors/datastream/firehose" >}}) (sink)
+ * [DataGen]({{< ref "docs/connectors/datastream/datagen" >}}) (source)
  * [Elasticsearch]({{< ref "docs/connectors/datastream/elasticsearch" >}}) 
(sink)
  * [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) 
(source/sink)
  * [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink)
diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/18509c9e-3250-4c52-91b9-11ccefc85db1
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/18509c9e-3250-4c52-91b9-11ccefc85db1
index 3a09bcb8547..e2812cfad3a 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/18509c9e-3250-4c52-91b9-11ccefc85db1
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/18509c9e-3250-4c52-91b9-11ccefc85db1
@@ -43,7 +43,7 @@ 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.getSplitSerialize
 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.getSplitSerializer():
 Returned leaf type org.apache.flink.core.io.SimpleVersionedSerializer does not 
satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.restoreEnumerator(org.apache.flink.api.connector.source.SplitEnumeratorContext,
 java.util.Collection): Argument leaf type 
org.apache.flink.api.connector.source.lib.NumberSequenceSource$NumberSequenceSplit
 does not satisfy: reside outside of package 'org.apache.flink..' or reside in 
any package ['..shaded..'] or annotated with @Public or annotated with 
@Deprecated
 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.restoreEnumerator(org.apache.flink.api.connector.source.SplitEnumeratorContext,
 java.util.Collection): Returned leaf type 
org.apache.flink.api.connector.source.lib.NumberSequenceSource$NumberSequenceSplit
 does not satisfy: reside outside of package 'org.apache.flink..' or reside in 
any package ['..shaded..'] or annotated with @Public or annotated with 
@Deprecated
-org.apache.flink.api.connector.source.lib.util.IteratorSourceReader.pollNext(org.apache.flink.api.connector.source.ReaderOutput):
 Returned leaf type org.apache.flink.core.io.InputStatus does not satisfy: 
reside outside of package 'org.apache.flink..' or reside in any package 
['..shaded..'] or annotated with @Public or annotated with @Deprecated
+org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase.pollNext(org.apache.flink.api.connector.source.ReaderOutput):
 Returned leaf type org.apache.flink.core.io.InputStatus does not satisfy: 
reside outside of package 'org.apache.flink..' or reside in any package 
['..shaded..'] or annotated with @Public or annotated with @Deprecated
 org.apache.flink.api.java.DataSet.coGroup(org.apache.flink.api.java.DataSet): 
Returned leaf type 
org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets does 
not satisfy: reside outside of package 'org.apache.flink..' or reside in any 
package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
 
org.apache.flink.api.java.DataSet.write(org.apache.flink.api.common.io.FileOutputFormat,
 java.lang.String, org.apache.flink.core.fs.FileSystem$WriteMode): Argument 
leaf type org.apache.flink.core.fs.FileSystem$WriteMode does not satisfy: 
reside outside of package 'org.apache.flink..' or reside in any package 
['..shaded..'] or annotated with @Public or annotated with @Deprecated
 org.apache.flink.api.java.DataSet.writeAsCsv(java.lang.String, 
java.lang.String, java.lang.String, 
org.apache.flink.core.fs.FileSystem$WriteMode): Argument leaf type 
org.apache.flink.core.fs.FileSystem$WriteMode does not satisfy: reside outside 
of package 'org.apache.flink..' or reside in any package ['..shaded..'] or 
annotated with @Public or annotated with @Deprecated
diff --git 
a/flink-connectors/flink-connector-datagen/archunit-violations/0fbe3123-5829-4891-93a5-a99bd8413fd9
 
b/flink-connectors/flink-connector-datagen/archunit-violations/0fbe3123-5829-4891-93a5-a99bd8413fd9
new file mode 100644
index 00000000000..e69de29bb2d
diff --git 
a/flink-connectors/flink-connector-datagen/archunit-violations/5a661a23-5b47-407c-9994-b6215a46c45c
 
b/flink-connectors/flink-connector-datagen/archunit-violations/5a661a23-5b47-407c-9994-b6215a46c45c
new file mode 100644
index 00000000000..e69de29bb2d
diff --git 
a/flink-connectors/flink-connector-datagen/archunit-violations/stored.rules 
b/flink-connectors/flink-connector-datagen/archunit-violations/stored.rules
new file mode 100644
index 00000000000..687f775c531
--- /dev/null
+++ b/flink-connectors/flink-connector-datagen/archunit-violations/stored.rules
@@ -0,0 +1,4 @@
+#
+#Sun Oct 09 19:59:56 CEST 2022
+ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ 
extension=5a661a23-5b47-407c-9994-b6215a46c45c
+Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ 
ITCase=0fbe3123-5829-4891-93a5-a99bd8413fd9
diff --git a/flink-connectors/flink-connector-datagen/pom.xml 
b/flink-connectors/flink-connector-datagen/pom.xml
new file mode 100644
index 00000000000..1d70814417f
--- /dev/null
+++ b/flink-connectors/flink-connector-datagen/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.17-SNAPSHOT</version>
+       </parent>
+
+       <artifactId>flink-connector-datagen</artifactId>
+       <name>Flink : Connectors : Datagen</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- test dependency -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+
+               <!-- ArchUit test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-architecture-tests-test</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+</project>
diff --git 
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
new file mode 100644
index 00000000000..8f128414efb
--- /dev/null
+++ 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceReaderFactory;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * <p>The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * <p>Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 1", ... , "Number: 999"] elements.
+ *
+ * <pre>{@code
+ * GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource<String> source =
+ *         new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource<String> stream =
+ *         env.fromSource(source,
+ *         WatermarkStrategy.noWatermarks(),
+ *         "Generator Source");
+ * }</pre>
+ *
+ * <p>The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * <p>Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * <p>This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * <pre>{@code
+ * GeneratorFunction<Long, Long> generatorFunction = index -> index;
+ *
+ * DataGeneratorSource<String> source =
+ *         new DataGeneratorSource<>(
+ *              generatorFunctionStateless,
+ *              Long.MAX_VALUE,
+ *              RateLimiterStrategy.perSecond(100),
+ *              Types.STRING);
+ * }</pre>
+ *
+ * <p>This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, the 
end bound is pretty
+ * far away.
+ */
+@Experimental
+public class DataGeneratorSource<OUT>
+        implements Source<OUT, NumberSequenceSplit, 
Collection<NumberSequenceSplit>>,
+                ResultTypeQueryable<OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final SourceReaderFactory<OUT, NumberSequenceSplit> 
sourceReaderFactory;
+    private final TypeInformation<OUT> typeInfo;
+
+    private final NumberSequenceSource numberSource;
+
+    /**
+     * Instantiates a new {@code DataGeneratorSource}.
+     *
+     * @param generatorFunction The {@code GeneratorFunction} function.
+     * @param count The number of generated data points.
+     * @param typeInfo The type of the produced data points.
+     */
+    public DataGeneratorSource(
+            GeneratorFunction<Long, OUT> generatorFunction,
+            long count,
+            TypeInformation<OUT> typeInfo) {
+        this(generatorFunction, count, RateLimiterStrategy.noOp(), typeInfo);
+    }
+
+    /**
+     * Instantiates a new {@code DataGeneratorSource}.
+     *
+     * @param generatorFunction The {@code GeneratorFunction} function.
+     * @param count The number of generated data points.
+     * @param rateLimiterStrategy The strategy for rate limiting.
+     * @param typeInfo The type of the produced data points.
+     */
+    public DataGeneratorSource(
+            GeneratorFunction<Long, OUT> generatorFunction,
+            long count,
+            RateLimiterStrategy rateLimiterStrategy,
+            TypeInformation<OUT> typeInfo) {
+        this(
+                new GeneratorSourceReaderFactory<>(generatorFunction, 
rateLimiterStrategy),
+                count,
+                typeInfo);
+        ClosureCleaner.clean(
+                generatorFunction, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+        ClosureCleaner.clean(
+                rateLimiterStrategy, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+    }
+
+    private DataGeneratorSource(
+            SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory,
+            long count,
+            TypeInformation<OUT> typeInfo) {
+        this.sourceReaderFactory = checkNotNull(sourceReaderFactory);
+        ClosureCleaner.clean(
+                sourceReaderFactory, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+        this.typeInfo = checkNotNull(typeInfo);
+        this.numberSource = new NumberSequenceSource(0, count - 1);
+    }
+
+    // ------------------------------------------------------------------------
+    //  source methods
+    // ------------------------------------------------------------------------
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return typeInfo;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<OUT, NumberSequenceSplit> 
createReader(SourceReaderContext readerContext)
+            throws Exception {
+        return sourceReaderFactory.createReader(readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<NumberSequenceSplit, 
Collection<NumberSequenceSplit>> restoreEnumerator(
+            SplitEnumeratorContext<NumberSequenceSplit> enumContext,
+            Collection<NumberSequenceSplit> checkpoint) {
+        return numberSource.restoreEnumerator(enumContext, checkpoint);
+    }
+
+    @Override
+    public SplitEnumerator<NumberSequenceSplit, 
Collection<NumberSequenceSplit>> createEnumerator(
+            final SplitEnumeratorContext<NumberSequenceSplit> enumContext) {
+        return numberSource.createEnumerator(enumContext);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<NumberSequenceSplit> getSplitSerializer() 
{
+        return numberSource.getSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<Collection<NumberSequenceSplit>>
+            getEnumeratorCheckpointSerializer() {
+        return numberSource.getEnumeratorCheckpointSerializer();
+    }
+}
diff --git 
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratingIteratorSourceReader.java
 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratingIteratorSourceReader.java
new file mode 100644
index 00000000000..b46f93c6b83
--- /dev/null
+++ 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratingIteratorSourceReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Iterator;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@code SourceReader} that takes the values of an iterator, supplied via 
an {@link
+ * IteratorSourceSplit}, and applies a {@link GeneratorFunction} to them to 
perform arbitrary
+ * transformations.
+ */
+@Experimental
+public class GeneratingIteratorSourceReader<
+                E, O, IterT extends Iterator<E>, SplitT extends 
IteratorSourceSplit<E, IterT>>
+        extends IteratorSourceReaderBase<E, O, IterT, SplitT> {
+
+    private final GeneratorFunction<E, O> generatorFunction;
+
+    public GeneratingIteratorSourceReader(
+            SourceReaderContext context, GeneratorFunction<E, O> 
generatorFunction) {
+        super(context);
+        this.generatorFunction = checkNotNull(generatorFunction);
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Override
+    protected O convert(E value) {
+        try {
+            return generatorFunction.map(value);
+        } catch (Exception e) {
+            String message =
+                    String.format(
+                            "A user-provided generator function threw an 
exception on this input: %s",
+                            value.toString());
+            throw new FlinkRuntimeException(message, e);
+        }
+    }
+
+    @Override
+    public void start(SourceReaderContext context) {
+        try {
+            generatorFunction.open(context);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Failed to open the 
GeneratorFunction", e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        generatorFunction.close();
+        super.close();
+    }
+}
diff --git 
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorFunction.java
 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorFunction.java
new file mode 100644
index 00000000000..096be1fe277
--- /dev/null
+++ 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+
+/**
+ * Base interface for data generator functions. Data generator functions take 
elements and transform
+ * them, element-wise. They are the core building block of the {@link 
DataGeneratorSource} that
+ * drives the data generation process by supplying "index" values of type 
Long. It makes it possible
+ * to produce specific elements at concrete positions of the generated data 
stream.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + 
index;
+ * DataGeneratorSource<String> source =
+ *         new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ * }</pre>
+ *
+ * @param <T> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+@Experimental
+public interface GeneratorFunction<T, O> extends Function {
+
+    /**
+     * Initialization method for the function. It is called once before the 
actual data mapping
+     * methods.
+     */
+    default void open(SourceReaderContext readerContext) throws Exception {}
+
+    /** Tear-down method for the function. */
+    default void close() throws Exception {}
+
+    O map(T value) throws Exception;
+}
diff --git 
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java
 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java
new file mode 100644
index 00000000000..f3888d3d1de
--- /dev/null
+++ 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A factory for instantiating source readers that produce elements by 
applying a user-supplied
+ * {@link GeneratorFunction}.
+ *
+ * @param <OUT> The type of the output elements.
+ */
+@Internal
+class GeneratorSourceReaderFactory<OUT>
+        implements SourceReaderFactory<OUT, 
NumberSequenceSource.NumberSequenceSplit> {
+
+    private final GeneratorFunction<Long, OUT> generatorFunction;
+    private final RateLimiterStrategy rateLimiterStrategy;
+
+    /**
+     * Instantiates a new {@code GeneratorSourceReaderFactory}.
+     *
+     * @param generatorFunction The generator function.
+     * @param rateLimiterStrategy The rate limiter strategy.
+     */
+    public GeneratorSourceReaderFactory(
+            GeneratorFunction<Long, OUT> generatorFunction,
+            RateLimiterStrategy rateLimiterStrategy) {
+        this.generatorFunction = checkNotNull(generatorFunction);
+        this.rateLimiterStrategy = checkNotNull(rateLimiterStrategy);
+    }
+
+    @Override
+    public SourceReader<OUT, NumberSequenceSource.NumberSequenceSplit> 
createReader(
+            SourceReaderContext readerContext) {
+        int parallelism = readerContext.currentParallelism();
+        RateLimiter rateLimiter = 
rateLimiterStrategy.createRateLimiter(parallelism);
+        return new RateLimitedSourceReader<>(
+                new GeneratingIteratorSourceReader<>(readerContext, 
generatorFunction),
+                rateLimiter);
+    }
+}
diff --git 
a/flink-connectors/flink-connector-datagen/src/main/resources/log4j2.properties 
b/flink-connectors/flink-connector-datagen/src/main/resources/log4j2.properties
new file mode 100644
index 00000000000..c64a340a8dc
--- /dev/null
+++ 
b/flink-connectors/flink-connector-datagen/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = OFF
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git 
a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
 
b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
new file mode 100644
index 00000000000..96669b8ac82
--- /dev/null
+++ 
b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.architecture;
+
+import org.apache.flink.architecture.common.ImportOptions;
+
+import com.tngtech.archunit.core.importer.ImportOption;
+import com.tngtech.archunit.junit.AnalyzeClasses;
+import com.tngtech.archunit.junit.ArchTest;
+import com.tngtech.archunit.junit.ArchTests;
+
+/** Architecture tests for test code. */
+@AnalyzeClasses(
+        packages = "org.apache.flink.connector.datagen",
+        importOptions = {
+            ImportOption.OnlyIncludeTests.class,
+            ImportOptions.ExcludeScalaImportOption.class,
+            ImportOptions.ExcludeShadedImportOption.class
+        })
+public class TestCodeArchitectureTest {
+
+    @ArchTest
+    public static final ArchTests COMMON_TESTS = 
ArchTests.in(TestCodeArchitectureTestBase.class);
+}
diff --git 
a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
 
b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
new file mode 100644
index 00000000000..76f5eca8494
--- /dev/null
+++ 
b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static java.util.stream.Collectors.summingInt;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** An integration test for {@code DataGeneratorSource}. */
+class DataGeneratorSourceITCase extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+
+    @RegisterExtension
+    private static final MiniClusterExtension miniClusterExtension =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .build());
+
+    // ------------------------------------------------------------------------
+
+    @Test
+    @DisplayName("Combined results of parallel source readers produce the 
expected sequence.")
+    void testParallelSourceExecution() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        final DataStream<Long> stream = getGeneratorSourceStream(index -> 
index, env, 1_000L);
+
+        final List<Long> result = stream.executeAndCollect(10000);
+
+        assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999));
+    }
+
+    @Test
+    @DisplayName("Generator function can be instantiated as an anonymous 
class.")
+    void testParallelSourceExecutionWithAnonymousClass() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        GeneratorFunction<Long, Long> generatorFunction =
+                new GeneratorFunction<Long, Long>() {
+
+                    @Override
+                    public Long map(Long value) {
+                        return value;
+                    }
+                };
+
+        final DataStream<Long> stream = 
getGeneratorSourceStream(generatorFunction, env, 1_000L);
+
+        final List<Long> result = stream.executeAndCollect(10000);
+
+        assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999));
+    }
+
+    @Test
+    @DisplayName("Exceptions from the generator function are not 'swallowed'.")
+    void testFailingGeneratorFunction() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        GeneratorFunction<Long, Long> generatorFunction =
+                value -> {
+                    throw new Exception("boom");
+                };
+
+        final DataStream<Long> stream = 
getGeneratorSourceStream(generatorFunction, env, 1_000L);
+
+        assertThatThrownBy(
+                        () -> {
+                            stream.executeAndCollect(10000);
+                        })
+                .satisfies(anyCauseMatches("exception on this input:"))
+                .satisfies(anyCauseMatches("boom"));
+    }
+
+    @Test
+    @DisplayName("Exceptions from the generator function initialization are 
not 'swallowed'.")
+    // FIX_ME: failure details are swallowed by Flink
+    // Full details are still available at this line:
+    // 
https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L758
+    // But the execution falls through to the line below and discards the root 
cause of
+    // cancelling the source invokable without recording it:
+    // 
https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L780
+    @Disabled
+    void testFailingGeneratorFunctionInitialization() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        GeneratorFunction<Long, Long> generatorFunctionFailingInit =
+                new GeneratorFunction<Long, Long>() {
+                    @Override
+                    public void open(SourceReaderContext readerContext) throws 
Exception {
+                        throw new Exception("boom");
+                    }
+
+                    @Override
+                    public Long map(Long value) {
+                        return value;
+                    }
+                };
+
+        final DataStream<Long> stream =
+                getGeneratorSourceStream(generatorFunctionFailingInit, env, 
1_000L);
+
+        assertThatThrownBy(
+                        () -> {
+                            stream.executeAndCollect(10000);
+                        })
+                .satisfies(anyCauseMatches("Failed to open"))
+                .satisfies(anyCauseMatches("boom"));
+    }
+
+    @Test
+    @DisplayName(
+            "Result is correct when less elements are expected than the number 
of parallel source readers")
+    void testLessSplitsThanParallelism() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        int n = PARALLELISM - 2;
+        DataStream<Long> stream = getGeneratorSourceStream(index -> index, 
env, n).map(l -> l);
+
+        List<Long> result = stream.executeAndCollect(100);
+
+        assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, n - 
1));
+    }
+
+    @Test
+    @DisplayName("Test GatedRateLimiter")
+    void testGatedRateLimiter() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(100);
+
+        env.setParallelism(PARALLELISM);
+
+        int capacityPerSubtaskPerCycle = 2;
+        int capacityPerCycle = // avoid rounding errors when spreading records 
among subtasks
+                PARALLELISM * capacityPerSubtaskPerCycle;
+
+        final GeneratorFunction<Long, Long> generatorFunction = index -> 1L;
+
+        // Allow each subtask to produce at least 3 cycles, gated by 
checkpoints
+        int count = capacityPerCycle * 3;
+        final DataGeneratorSource<Long> generatorSource =
+                new DataGeneratorSource<>(
+                        generatorFunction,
+                        count,
+                        RateLimiterStrategy.perCheckpoint(capacityPerCycle),
+                        Types.LONG);
+
+        final DataStreamSource<Long> streamSource =
+                env.fromSource(generatorSource, 
WatermarkStrategy.noWatermarks(), "Data Generator");
+        final DataStream<Tuple2<Integer, Long>> map =
+                streamSource.map(new SubtaskAndCheckpointMapper());
+        final List<Tuple2<Integer, Long>> results = 
map.executeAndCollect(1000);
+
+        final Map<Tuple2<Integer, Long>, Integer> collect =
+                results.stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        x -> (new Tuple2<>(x.f0, x.f1)), 
summingInt(x -> 1)));
+        for (Map.Entry<Tuple2<Integer, Long>, Integer> entry : 
collect.entrySet()) {
+            assertThat(entry.getValue()).isEqualTo(capacityPerSubtaskPerCycle);
+        }
+    }
+
+    private static class SubtaskAndCheckpointMapper
+            extends RichMapFunction<Long, Tuple2<Integer, Long>> implements 
CheckpointListener {
+
+        private long checkpointId = 0;
+        private int subtaskIndex;
+
+        @Override
+        public void open(Configuration parameters) {
+            subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+        }
+
+        @Override
+        public Tuple2<Integer, Long> map(Long value) {
+            return new Tuple2<>(subtaskIndex, checkpointId);
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) {
+            this.checkpointId = checkpointId;
+        }
+    }
+
+    private DataStream<Long> getGeneratorSourceStream(
+            GeneratorFunction<Long, Long> generatorFunction,
+            StreamExecutionEnvironment env,
+            long count) {
+        DataGeneratorSource<Long> dataGeneratorSource =
+                new DataGeneratorSource<>(generatorFunction, count, 
Types.LONG);
+
+        return env.fromSource(
+                dataGeneratorSource, WatermarkStrategy.noWatermarks(), 
"generator source");
+    }
+
+    private List<Long> range(int startInclusive, int endInclusive) {
+        return LongStream.rangeClosed(startInclusive, endInclusive)
+                .boxed()
+                .collect(Collectors.toList());
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
 
b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java
similarity index 63%
copy from 
flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
copy to 
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java
index 5944201d1ad..2caf93f6e08 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
+++ 
b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java
@@ -16,14 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.api.connector.source.lib;
+package org.apache.flink.connector.datagen.source;
 
 import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
@@ -31,19 +35,57 @@ import 
org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.util.SimpleUserCodeClassLoader;
 import org.apache.flink.util.UserCodeClassLoader;
 
-import org.junit.Test;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.stream.LongStream;
 
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for the {@link NumberSequenceSource}. */
-public class NumberSequenceSourceTest {
+/** Tests for the {@link DataGeneratorSource}. */
+class DataGeneratorSourceTest {
 
     @Test
-    public void testReaderCheckpoints() throws Exception {
+    @DisplayName("Correctly restores SplitEnumerator from a snapshot.")
+    void testRestoreEnumerator() throws Exception {
+        final GeneratorFunction<Long, Long> generatorFunctionStateless = index 
-> index;
+        final DataGeneratorSource<Long> dataGeneratorSource =
+                new DataGeneratorSource<>(generatorFunctionStateless, 100, 
Types.LONG);
+
+        final int parallelism = 2;
+        final 
MockSplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> context =
+                new MockSplitEnumeratorContext<>(parallelism);
+
+        SplitEnumerator<
+                        NumberSequenceSource.NumberSequenceSplit,
+                        Collection<NumberSequenceSource.NumberSequenceSplit>>
+                enumerator = dataGeneratorSource.createEnumerator(context);
+
+        // start() is not strictly necessary in the current implementation, 
but should logically be
+        // executed in this order (protect against any breaking changes in the 
start() method).
+        enumerator.start();
+
+        Collection<NumberSequenceSource.NumberSequenceSplit> enumeratorState =
+                enumerator.snapshotState(0);
+        assertThat(enumeratorState).hasSize(parallelism);
+
+        enumerator = dataGeneratorSource.restoreEnumerator(context, 
enumeratorState);
+
+        // Verify that splits were restored and can be assigned
+        assertThat(context.getSplitsAssignmentSequence()).isEmpty();
+        for (NumberSequenceSource.NumberSequenceSplit ignored : 
enumeratorState) {
+            enumerator.handleSplitRequest(0, "hostname");
+        }
+        
assertThat(context.getSplitsAssignmentSequence()).hasSize(enumeratorState.size());
+    }
+
+    @Test
+    @DisplayName("Uses the underlying NumberSequenceSource correctly for 
checkpointing.")
+    void testReaderCheckpoints() throws Exception {
         final long from = 177;
         final long mid = 333;
         final long to = 563;
@@ -75,33 +117,19 @@ public class NumberSequenceSourceTest {
         }
 
         final List<Long> result = out.getEmittedRecords();
-        validateSequence(result, from, to);
-    }
-
-    private static void validateSequence(
-            final List<Long> sequence, final long from, final long to) {
-        if (sequence.size() != to - from + 1) {
-            failSequence(sequence, from, to);
-        }
+        final Iterable<Long> expected = LongStream.range(from, to + 
1)::iterator;
 
-        long nextExpected = from;
-        for (Long next : sequence) {
-            if (next != nextExpected++) {
-                failSequence(sequence, from, to);
-            }
-        }
-    }
-
-    private static void failSequence(final List<Long> sequence, final long 
from, final long to) {
-        fail(
-                String.format(
-                        "Expected: A sequence [%d, %d], but found: sequence 
(size %d) : %s",
-                        from, to, sequence.size(), sequence));
+        assertThat(result).containsExactlyElementsOf(expected);
     }
 
-    private static SourceReader<Long, 
NumberSequenceSource.NumberSequenceSplit> createReader() {
+    private static SourceReader<Long, 
NumberSequenceSource.NumberSequenceSplit> createReader()
+            throws Exception {
         // the arguments passed in the source constructor matter only to the 
enumerator
-        return new NumberSequenceSource(0L, 0L).createReader(new 
DummyReaderContext());
+        GeneratorFunction<Long, Long> generatorFunctionStateless = index -> 
index;
+        DataGeneratorSource<Long> dataGeneratorSource =
+                new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, Types.LONG);
+
+        return dataGeneratorSource.createReader(new DummyReaderContext());
     }
 
     // ------------------------------------------------------------------------
@@ -143,6 +171,11 @@ public class NumberSequenceSourceTest {
         public UserCodeClassLoader getUserCodeClassLoader() {
             return 
SimpleUserCodeClassLoader.create(getClass().getClassLoader());
         }
+
+        @Override
+        public int currentParallelism() {
+            return 1;
+        }
     }
 
     private static final class TestingReaderOutput<E> implements 
ReaderOutput<E> {
diff --git 
a/flink-connectors/flink-connector-datagen/src/test/resources/archunit.properties
 
b/flink-connectors/flink-connector-datagen/src/test/resources/archunit.properties
new file mode 100644
index 00000000000..15be88c95ba
--- /dev/null
+++ 
b/flink-connectors/flink-connector-datagen/src/test/resources/archunit.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# By default we allow removing existing violations, but fail when new 
violations are added.
+freeze.store.default.allowStoreUpdate=true
+
+# Enable this if a new (frozen) rule has been added in order to create the 
initial store and record the existing violations.
+#freeze.store.default.allowStoreCreation=true
+
+# Enable this to add allow new violations to be recorded.
+# NOTE: Adding new violations should be avoided when possible. If the rule was 
correct to flag a new
+#       violation, please try to avoid creating the violation. If the 
violation was created due to a
+#       shortcoming of the rule, file a JIRA issue so the rule can be improved.
+#freeze.refreeze=true
+
+freeze.store.default.path=archunit-violations
diff --git 
a/flink-connectors/flink-connector-datagen/src/test/resources/log4j2-test.properties
 
b/flink-connectors/flink-connector-datagen/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000000..c4fa18706ff
--- /dev/null
+++ 
b/flink-connectors/flink-connector-datagen/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
index 3c175a80c8d..bb44462ab05 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
@@ -225,6 +225,11 @@ class FileSourceHeavyThroughputTest {
         public UserCodeClassLoader getUserCodeClassLoader() {
             return 
SimpleUserCodeClassLoader.create(getClass().getClassLoader());
         }
+
+        @Override
+        public int currentParallelism() {
+            return 1;
+        }
     }
 
     private static final class NoOpReaderOutput<E> implements ReaderOutput<E> {
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 6c2a814430b..f05ff1e1b7c 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -53,6 +53,7 @@ under the License.
                <module>flink-file-sink-common</module>
                <module>flink-connector-files</module>
                <module>flink-connector-pulsar</module>
+               <module>flink-connector-datagen</module>
        </modules>
 
        <!-- override these root dependencies as 'provided', so they don't end 
up
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
index 899469f00ed..096e0d67376 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
@@ -21,8 +21,6 @@ package org.apache.flink.api.connector.source;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
-import java.io.Serializable;
-
 /**
  * The interface for Source. It acts like a factory class that helps construct 
the {@link
  * SplitEnumerator} and {@link SourceReader} and corresponding serializers.
@@ -32,7 +30,8 @@ import java.io.Serializable;
  * @param <EnumChkT> The type of the enumerator checkpoints.
  */
 @Public
-public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends 
Serializable {
+public interface Source<T, SplitT extends SourceSplit, EnumChkT>
+        extends SourceReaderFactory<T, SplitT> {
 
     /**
      * Get the boundedness of this source.
@@ -41,17 +40,6 @@ public interface Source<T, SplitT extends SourceSplit, 
EnumChkT> extends Seriali
      */
     Boundedness getBoundedness();
 
-    /**
-     * Creates a new reader to read data from the splits it gets assigned. The 
reader starts fresh
-     * and does not have any state to resume.
-     *
-     * @param readerContext The {@link SourceReaderContext context} for the 
source reader.
-     * @return A new SourceReader.
-     * @throws Exception The implementor is free to forward all exceptions 
directly. Exceptions
-     *     thrown from this method cause task failure/recovery.
-     */
-    SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) 
throws Exception;
-
     /**
      * Creates a new SplitEnumerator for this source, starting a new input.
      *
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index 0d25ec6bb3d..08c64501d9a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -63,4 +63,13 @@ public interface SourceReaderContext {
      * @see UserCodeClassLoader
      */
     UserCodeClassLoader getUserCodeClassLoader();
+
+    /**
+     * Get the current parallelism of this Source.
+     *
+     * @return the parallelism of the Source.
+     */
+    default int currentParallelism() {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderFactory.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderFactory.java
new file mode 100644
index 00000000000..e974def7b03
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.Public;
+
+import java.io.Serializable;
+
+/**
+ * A factory for creating source reader instances.
+ *
+ * @param <T> The type of the output elements.
+ */
+@Public
+public interface SourceReaderFactory<T, SplitT extends SourceSplit> extends 
Serializable {
+    /**
+     * Creates a new reader to read data from the splits it gets assigned. The 
reader starts fresh
+     * and does not have any state to resume.
+     *
+     * @param readerContext The {@link SourceReaderContext context} for the 
source reader.
+     * @return A new SourceReader.
+     * @throws Exception The implementor is free to forward all exceptions 
directly. Exceptions
+     *     thrown from this method cause task failure/recovery.
+     */
+    SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) 
throws Exception;
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
index d7a63c06a18..f6a567e90b9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
@@ -19,22 +19,10 @@
 package org.apache.flink.api.connector.source.lib.util;
 
 import org.apache.flink.annotation.Public;
-import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.core.io.InputStatus;
 
-import javax.annotation.Nullable;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A {@link SourceReader} that returns the values of an iterator, supplied via 
an {@link
@@ -52,128 +40,14 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Public
 public class IteratorSourceReader<
                 E, IterT extends Iterator<E>, SplitT extends 
IteratorSourceSplit<E, IterT>>
-        implements SourceReader<E, SplitT> {
-
-    /** The context for this reader, to communicate with the enumerator. */
-    private final SourceReaderContext context;
-
-    /** The availability future. This reader is available as soon as a split 
is assigned. */
-    private CompletableFuture<Void> availability;
-
-    /**
-     * The iterator producing data. Non-null after a split has been assigned. 
This field is null or
-     * non-null always together with the {@link #currentSplit} field.
-     */
-    @Nullable private IterT iterator;
-
-    /**
-     * The split whose data we return. Non-null after a split has been 
assigned. This field is null
-     * or non-null always together with the {@link #iterator} field.
-     */
-    @Nullable private SplitT currentSplit;
-
-    /** The remaining splits that were assigned but not yet processed. */
-    private final Queue<SplitT> remainingSplits;
-
-    private boolean noMoreSplits;
+        extends IteratorSourceReaderBase<E, E, IterT, SplitT> {
 
     public IteratorSourceReader(SourceReaderContext context) {
-        this.context = checkNotNull(context);
-        this.availability = new CompletableFuture<>();
-        this.remainingSplits = new ArrayDeque<>();
-    }
-
-    // ------------------------------------------------------------------------
-
-    @Override
-    public void start() {
-        // request a split if we don't have one
-        if (remainingSplits.isEmpty()) {
-            context.sendSplitRequest();
-        }
+        super(context);
     }
 
     @Override
-    public InputStatus pollNext(ReaderOutput<E> output) {
-        if (iterator != null) {
-            if (iterator.hasNext()) {
-                output.collect(iterator.next());
-                return InputStatus.MORE_AVAILABLE;
-            } else {
-                finishSplit();
-            }
-        }
-
-        return tryMoveToNextSplit();
+    protected E convert(E value) {
+        return value;
     }
-
-    private void finishSplit() {
-        iterator = null;
-        currentSplit = null;
-
-        // request another split if no other is left
-        // we do this only here in the finishSplit part to avoid requesting a 
split
-        // whenever the reader is polled and doesn't currently have a split
-        if (remainingSplits.isEmpty() && !noMoreSplits) {
-            context.sendSplitRequest();
-        }
-    }
-
-    private InputStatus tryMoveToNextSplit() {
-        currentSplit = remainingSplits.poll();
-        if (currentSplit != null) {
-            iterator = currentSplit.getIterator();
-            return InputStatus.MORE_AVAILABLE;
-        } else if (noMoreSplits) {
-            return InputStatus.END_OF_INPUT;
-        } else {
-            // ensure we are not called in a loop by resetting the 
availability future
-            if (availability.isDone()) {
-                availability = new CompletableFuture<>();
-            }
-
-            return InputStatus.NOTHING_AVAILABLE;
-        }
-    }
-
-    @Override
-    public CompletableFuture<Void> isAvailable() {
-        return availability;
-    }
-
-    @Override
-    public void addSplits(List<SplitT> splits) {
-        remainingSplits.addAll(splits);
-        // set availability so that pollNext is actually called
-        availability.complete(null);
-    }
-
-    @Override
-    public void notifyNoMoreSplits() {
-        noMoreSplits = true;
-        // set availability so that pollNext is actually called
-        availability.complete(null);
-    }
-
-    @Override
-    public List<SplitT> snapshotState(long checkpointId) {
-        if (currentSplit == null && remainingSplits.isEmpty()) {
-            return Collections.emptyList();
-        }
-
-        final ArrayList<SplitT> allSplits = new ArrayList<>(1 + 
remainingSplits.size());
-        if (iterator != null && iterator.hasNext()) {
-            assert currentSplit != null;
-
-            @SuppressWarnings("unchecked")
-            final SplitT inProgressSplit =
-                    (SplitT) currentSplit.getUpdatedSplitForIterator(iterator);
-            allSplits.add(inProgressSplit);
-        }
-        allSplits.addAll(remainingSplits);
-        return allSplits;
-    }
-
-    @Override
-    public void close() throws Exception {}
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java
similarity index 89%
copy from 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
copy to 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java
index d7a63c06a18..1e6237881a5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java
@@ -50,9 +50,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *     the iterator that produces this reader's elements.
  */
 @Public
-public class IteratorSourceReader<
-                E, IterT extends Iterator<E>, SplitT extends 
IteratorSourceSplit<E, IterT>>
-        implements SourceReader<E, SplitT> {
+public abstract class IteratorSourceReaderBase<
+                E, O, IterT extends Iterator<E>, SplitT extends 
IteratorSourceSplit<E, IterT>>
+        implements SourceReader<O, SplitT> {
 
     /** The context for this reader, to communicate with the enumerator. */
     private final SourceReaderContext context;
@@ -77,7 +77,7 @@ public class IteratorSourceReader<
 
     private boolean noMoreSplits;
 
-    public IteratorSourceReader(SourceReaderContext context) {
+    public IteratorSourceReaderBase(SourceReaderContext context) {
         this.context = checkNotNull(context);
         this.availability = new CompletableFuture<>();
         this.remainingSplits = new ArrayDeque<>();
@@ -91,22 +91,30 @@ public class IteratorSourceReader<
         if (remainingSplits.isEmpty()) {
             context.sendSplitRequest();
         }
+        start(context);
     }
 
+    protected void start(SourceReaderContext context) {}
+
     @Override
-    public InputStatus pollNext(ReaderOutput<E> output) {
+    public InputStatus pollNext(ReaderOutput<O> output) {
         if (iterator != null) {
             if (iterator.hasNext()) {
-                output.collect(iterator.next());
+                output.collect(convert(iterator.next()));
                 return InputStatus.MORE_AVAILABLE;
             } else {
                 finishSplit();
             }
         }
-
-        return tryMoveToNextSplit();
+        final InputStatus inputStatus = tryMoveToNextSplit();
+        if (inputStatus == InputStatus.MORE_AVAILABLE) {
+            output.collect(convert(iterator.next()));
+        }
+        return inputStatus;
     }
 
+    protected abstract O convert(E value);
+
     private void finishSplit() {
         iterator = null;
         currentSplit = null;
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java
new file mode 100644
index 00000000000..914c58d43b7
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+@Internal
+public class GatedRateLimiter implements RateLimiter {
+
+    private final int capacityPerCycle;
+    private int capacityLeft;
+
+    /**
+     * Instantiates a new GatedRateLimiter.
+     *
+     * @param capacityPerCycle The number of completed futures per cycle.
+     */
+    public GatedRateLimiter(int capacityPerCycle) {
+        checkArgument(capacityPerCycle > 0, "Capacity per cycle has to be a 
positive number.");
+        this.capacityPerCycle = capacityPerCycle;
+        this.capacityLeft = capacityPerCycle;
+    }
+
+    transient CompletableFuture<Void> gatingFuture = null;
+
+    @Override
+    public CompletionStage<Void> acquire() {
+        if (gatingFuture == null) {
+            gatingFuture = CompletableFuture.completedFuture(null);
+        }
+        if (capacityLeft <= 0) {
+            gatingFuture = new CompletableFuture<>();
+        }
+        return gatingFuture.thenRun(() -> capacityLeft -= 1);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        capacityLeft = capacityPerCycle;
+        gatingFuture.complete(null);
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java
new file mode 100644
index 00000000000..7e32bc61a96
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/** An implementation of {@link RateLimiter} based on Guava's RateLimiter. */
+@Internal
+public class GuavaRateLimiter
+        implements 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiter {
+
+    private final Executor limiter =
+            Executors.newSingleThreadExecutor(new 
ExecutorThreadFactory("flink-rate-limiter"));
+    private final RateLimiter rateLimiter;
+
+    public GuavaRateLimiter(double maxPerSecond) {
+        this.rateLimiter = RateLimiter.create(maxPerSecond);
+    }
+
+    @Override
+    public CompletionStage<Void> acquire() {
+        return CompletableFuture.runAsync(rateLimiter::acquire, limiter);
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java
new file mode 100644
index 00000000000..15938bbb81d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.concurrent.CompletionStage;
+
+/** A convenience implementation of {@link RateLimiter} that does not throttle 
requests. */
+@Internal
+public class NoOpRateLimiter implements RateLimiter {
+
+    @Override
+    public CompletionStage<Void> acquire() {
+        return FutureUtils.completedVoidFuture();
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java
new file mode 100644
index 00000000000..403ba36200c
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Wraps the actual {@link SourceReader} and rate limits its data emission. */
+@Experimental
+public class RateLimitedSourceReader<E, SplitT extends SourceSplit>
+        implements SourceReader<E, SplitT> {
+
+    private final SourceReader<E, SplitT> sourceReader;
+    private final RateLimiter rateLimiter;
+    private CompletableFuture<Void> availabilityFuture = null;
+
+    /**
+     * Instantiates a new rate-limited source reader.
+     *
+     * @param sourceReader The actual source reader.
+     * @param rateLimiter The rate limiter.
+     */
+    public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader, 
RateLimiter rateLimiter) {
+        checkNotNull(sourceReader);
+        checkNotNull(rateLimiter);
+        this.sourceReader = sourceReader;
+        this.rateLimiter = rateLimiter;
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void start() {
+        sourceReader.start();
+    }
+
+    @Override
+    public InputStatus pollNext(ReaderOutput<E> output) throws Exception {
+        // reset future because the next record may hit the rate limit
+        availabilityFuture = null;
+        final InputStatus inputStatus = sourceReader.pollNext(output);
+        if (inputStatus == InputStatus.MORE_AVAILABLE) {
+            // force another go through isAvailable() to evaluate rate-limiting
+            return InputStatus.NOTHING_AVAILABLE;
+        } else {
+            return inputStatus;
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> isAvailable() {
+        if (availabilityFuture == null) {
+            availabilityFuture =
+                    rateLimiter
+                            .acquire()
+                            .toCompletableFuture()
+                            .thenCombine(sourceReader.isAvailable(), (l, r) -> 
null);
+        }
+        return availabilityFuture;
+    }
+
+    @Override
+    public void addSplits(List<SplitT> splits) {
+        sourceReader.addSplits(splits);
+    }
+
+    @Override
+    public void notifyNoMoreSplits() {
+        sourceReader.notifyNoMoreSplits();
+    }
+
+    @Override
+    public List<SplitT> snapshotState(long checkpointId) {
+        return sourceReader.snapshotState(checkpointId);
+    }
+
+    @Override
+    public void close() throws Exception {
+        sourceReader.close();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        rateLimiter.notifyCheckpointComplete(checkpointId);
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
new file mode 100644
index 00000000000..a6bd004b894
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import org.apache.flink.annotation.Experimental;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.concurrent.CompletionStage;
+
+/** The interface to rate limit execution of methods. */
+@NotThreadSafe
+@Experimental
+public interface RateLimiter {
+
+    /**
+     * Returns a future that is completed once another event would not exceed 
the rate limit. For
+     * correct functioning, the next invocation of this method should only 
happen after the
+     * previously returned future has been completed.
+     */
+    CompletionStage<Void> acquire();
+
+    /**
+     * Notifies this {@code RateLimiter} that the checkpoint with the given 
{@code checkpointId}
+     * completed and was committed. Makes it possible to implement rate 
limiters that control data
+     * emission per checkpoint cycle.
+     *
+     * @param checkpointId The ID of the checkpoint that has been completed.
+     */
+    default void notifyCheckpointComplete(long checkpointId) {}
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java
new file mode 100644
index 00000000000..684e919a500
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A factory for {@link RateLimiter RateLimiters} which apply rate-limiting to 
a source sub-task.
+ */
+@Experimental
+public interface RateLimiterStrategy extends Serializable {
+
+    /**
+     * Creates a {@link RateLimiter} that lets records through with rate 
proportional to the
+     * parallelism. This method will be called once per source subtask. The 
cumulative rate over all
+     * rate limiters for a source must not exceed the rate limit configured 
for the strategy.
+     */
+    RateLimiter createRateLimiter(int parallelism);
+
+    /**
+     * Creates a {@code RateLimiterStrategy} that is limiting the number of 
records per second.
+     *
+     * @param recordsPerSecond The number of records produced per second. The 
actual number of
+     *     produced records is subject to rounding due to dividing the number 
of produced records
+     *     among the parallel instances.
+     */
+    static RateLimiterStrategy perSecond(double recordsPerSecond) {
+        return parallelism -> new GuavaRateLimiter(recordsPerSecond / 
parallelism);
+    }
+
+    /**
+     * Creates a {@code RateLimiterStrategy} that is limiting the number of 
records per checkpoint.
+     *
+     * @param recordsPerCheckpoint The number of records produced per 
checkpoint. This value has to
+     *     be greater or equal to parallelism. The actual number of produced 
records is subject to
+     *     rounding due to dividing the number of produced records among the 
parallel instances.
+     */
+    static RateLimiterStrategy perCheckpoint(int recordsPerCheckpoint) {
+        return parallelism -> {
+            int recordsPerSubtask = recordsPerCheckpoint / parallelism;
+            checkArgument(
+                    recordsPerSubtask > 0,
+                    "recordsPerCheckpoint has to be greater or equal to 
parallelism. "
+                            + "Either decrease the parallelism or increase the 
number of "
+                            + "recordsPerCheckpoint.");
+            return new GatedRateLimiter(recordsPerSubtask);
+        };
+    }
+
+    /** Creates a convenience {@code RateLimiterStrategy} that is not limiting 
the records rate. */
+    static RateLimiterStrategy noOp() {
+        return parallelism -> new NoOpRateLimiter();
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
index 5944201d1ad..ce97ff71269 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
@@ -143,6 +143,11 @@ public class NumberSequenceSourceTest {
         public UserCodeClassLoader getUserCodeClassLoader() {
             return 
SimpleUserCodeClassLoader.create(getClass().getClassLoader());
         }
+
+        @Override
+        public int currentParallelism() {
+            return 1;
+        }
     }
 
     private static final class TestingReaderOutput<E> implements 
ReaderOutput<E> {
diff --git a/flink-examples/flink-examples-streaming/pom.xml 
b/flink-examples/flink-examples-streaming/pom.xml
index f037f846070..ecba009e5cb 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -72,6 +72,12 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-datagen</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-shaded-jackson</artifactId>
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/datagen/DataGenerator.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/datagen/DataGenerator.java
new file mode 100644
index 00000000000..326e2a2a2a9
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/datagen/DataGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.datagen;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/** An example for generating data with a {@link DataGeneratorSource}. */
+public class DataGenerator {
+
+    public static void main(String[] args) throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(4);
+
+        GeneratorFunction<Long, String> generatorFunction = index -> "Number: 
" + index;
+
+        DataGeneratorSource<String> generatorSource =
+                new DataGeneratorSource<>(
+                        generatorFunction,
+                        Long.MAX_VALUE,
+                        RateLimiterStrategy.perSecond(4),
+                        Types.STRING);
+
+        DataStreamSource<String> streamSource =
+                env.fromSource(generatorSource, 
WatermarkStrategy.noWatermarks(), "Data Generator");
+        streamSource.print();
+
+        env.execute("Data Generator Source Example");
+    }
+}
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/datagen/DataGeneratorPerCheckpoint.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/datagen/DataGeneratorPerCheckpoint.java
new file mode 100644
index 00000000000..5eef73755c5
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/datagen/DataGeneratorPerCheckpoint.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.datagen;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/** An example for generating specific data per checkpoint with a {@link 
DataGeneratorSource} . */
+public class DataGeneratorPerCheckpoint {
+
+    public static void main(String[] args) throws Exception {
+
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(3000);
+        env.setParallelism(1);
+
+        final String[] elements = new String[] {"a", "b", "c", "d", "e", "f", 
"g", "h", "i", "j"};
+        final int size = elements.length;
+        final GeneratorFunction<Long, String> generatorFunction =
+                index -> elements[(int) (index % size)];
+
+        final DataGeneratorSource<String> generatorSource =
+                new DataGeneratorSource<>(
+                        generatorFunction,
+                        Long.MAX_VALUE,
+                        RateLimiterStrategy.perCheckpoint(size),
+                        Types.STRING);
+
+        final DataStreamSource<String> streamSource =
+                env.fromSource(generatorSource, 
WatermarkStrategy.noWatermarks(), "Data Generator");
+        streamSource.print();
+
+        env.execute("Data Generator Source Example");
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
index c90c30718e1..600570307ca 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
@@ -33,8 +33,11 @@ import javax.annotation.Nullable;
 /**
  * A data generator source that abstract data generator. It can be used to 
easy startup/test for
  * streaming job and performance testing. It is stateful, re-scalable, 
possibly in parallel.
+ *
+ * @deprecated Use {@code 
org.apache.flink.connector.datagen.source.DataGeneratorSource} instead.
  */
 @Experimental
+@Deprecated
 public class DataGeneratorSource<T> extends RichParallelSourceFunction<T>
         implements CheckpointedFunction {
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 71e48d9a2d0..817d19dd795 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -297,6 +297,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
                             }
                         };
                     }
+
+                    @Override
+                    public int currentParallelism() {
+                        return 
getRuntimeContext().getNumberOfParallelSubtasks();
+                    }
                 };
 
         sourceReader = readerFactory.apply(context);
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
index d83f8978684..4855ced967b 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
@@ -86,6 +86,11 @@ public class TestingReaderContext implements 
SourceReaderContext {
         return SimpleUserCodeClassLoader.create(getClass().getClassLoader());
     }
 
+    @Override
+    public int currentParallelism() {
+        return 1;
+    }
+
     // ------------------------------------------------------------------------
 
     public int getNumSplitRequests() {
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 5fa72304080..c310ce0998b 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -65,6 +65,12 @@ under the License.
                        <type>test-jar</type>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-datagen</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-core</artifactId>
diff --git 
a/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiterTest.java
 
b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiterTest.java
new file mode 100644
index 00000000000..15a034e0f7d
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiterTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletionStage;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class GatedRateLimiterTest {
+
+    @Test
+    void testCapacityNotExceededOnCheckpoint() {
+        int capacityPerCycle = 5;
+
+        final GatedRateLimiter gatedRateLimiter = new 
GatedRateLimiter(capacityPerCycle);
+        for (int x = 0; x < capacityPerCycle; x++) {
+            assertThat(gatedRateLimiter.acquire()).isCompleted();
+        }
+
+        CompletionStage<Void> postInitialBatch = gatedRateLimiter.acquire();
+        assertThat(postInitialBatch).isNotCompleted();
+
+        gatedRateLimiter.notifyCheckpointComplete(0);
+
+        assertThat(postInitialBatch).isCompleted();
+        for (int x = 0; x < capacityPerCycle - 1; x++) {
+            assertThat(gatedRateLimiter.acquire()).isCompleted();
+        }
+
+        CompletionStage<Void> postCheckpoint = gatedRateLimiter.acquire();
+        assertThat(postCheckpoint).isNotCompleted();
+    }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java
new file mode 100644
index 00000000000..ed9df3f4a71
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** An integration test for rate limiting built into the DataGeneratorSource. 
*/
+public class RateLimitedSourceReaderITCase extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+
+    @RegisterExtension
+    private static final MiniClusterExtension miniClusterExtension =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .build());
+
+    // ------------------------------------------------------------------------
+
+    @Test
+    @DisplayName("Rate limiter is used correctly.")
+    public void testRateLimitingParallelExecution() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        final int count = 10;
+
+        final MockRateLimiterStrategy rateLimiterStrategy = new 
MockRateLimiterStrategy();
+
+        final DataGeneratorSource<Long> dataGeneratorSource =
+                new DataGeneratorSource<>(index -> index, 10, 
rateLimiterStrategy, Types.LONG);
+
+        final DataStream<Long> stream =
+                env.fromSource(
+                        dataGeneratorSource, WatermarkStrategy.noWatermarks(), 
"generator source");
+
+        final List<Long> result = stream.executeAndCollect(10000);
+        int rateLimiterCallCount = 
MockRateLimiterStrategy.getRateLimitersCallCount();
+
+        assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 9));
+        assertThat(rateLimiterCallCount).isGreaterThan(count);
+    }
+
+    private List<Long> range(int startInclusive, int endInclusive) {
+        return LongStream.rangeClosed(startInclusive, endInclusive)
+                .boxed()
+                .collect(Collectors.toList());
+    }
+
+    private static final class MockRateLimiter implements RateLimiter {
+
+        int callCount;
+
+        @Override
+        public CompletableFuture<Void> acquire() {
+            callCount++;
+            return CompletableFuture.completedFuture(null);
+        }
+
+        public int getCallCount() {
+            return callCount;
+        }
+    }
+
+    private static class MockRateLimiterStrategy implements 
RateLimiterStrategy {
+
+        private static final List<MockRateLimiter> rateLimiters = new 
ArrayList<>();
+
+        @Override
+        public RateLimiter createRateLimiter(int parallelism) {
+            MockRateLimiter mockRateLimiter = new MockRateLimiter();
+            rateLimiters.add(mockRateLimiter);
+            return mockRateLimiter;
+        }
+
+        public static int getRateLimitersCallCount() {
+            return 
rateLimiters.stream().mapToInt(MockRateLimiter::getCallCount).sum();
+        }
+    }
+}
diff --git a/pom.xml b/pom.xml
index b5e4ee04a28..4c4a173ef7e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2135,6 +2135,7 @@ under the License.
                                                                <!-- MARKER: 
start exclusions; these will be wiped by 
tools/releasing/update_japicmp_configuration.sh -->
                                                                <!-- 
UnionSerializerConfigSnapshot was a PublicEvolving and Deprecated class that 
has been removed, embedded inside a Public CoGroupedStreams class, triggering 
this false failure -->
                                                                
<exclude>org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializerConfigSnapshot</exclude>
+                                                               
<exclude>org.apache.flink.api.connector.source.SourceReaderContext#currentParallelism()</exclude>
                                                                <!-- MARKER: 
end exclusions -->
                                                        </excludes>
                                                        
<accessModifier>public</accessModifier>

Reply via email to