This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a5667e82e25 [FLINK-27919] Add FLIP-238 data generator source
a5667e82e25 is described below
commit a5667e82e25cb87dc5523b82b08aec3e1408e9c6
Author: Alexander Fedulov <[email protected]>
AuthorDate: Wed Nov 23 16:33:08 2022 +0100
[FLINK-27919] Add FLIP-238 data generator source
---
.../docs/connectors/datastream/datagen.md | 104 +++++++++
.../docs/connectors/datastream/overview.md | 1 +
docs/content/docs/connectors/datastream/datagen.md | 104 +++++++++
.../content/docs/connectors/datastream/overview.md | 1 +
.../18509c9e-3250-4c52-91b9-11ccefc85db1 | 2 +-
.../0fbe3123-5829-4891-93a5-a99bd8413fd9 | 0
.../5a661a23-5b47-407c-9994-b6215a46c45c | 0
.../archunit-violations/stored.rules | 4 +
flink-connectors/flink-connector-datagen/pom.xml | 78 +++++++
.../datagen/source/DataGeneratorSource.java | 195 ++++++++++++++++
.../source/GeneratingIteratorSourceReader.java | 78 +++++++
.../datagen/source/GeneratorFunction.java | 55 +++++
.../source/GeneratorSourceReaderFactory.java | 67 ++++++
.../src/main/resources/log4j2.properties | 25 +++
.../architecture/TestCodeArchitectureTest.java | 40 ++++
.../datagen/source/DataGeneratorSourceITCase.java | 250 +++++++++++++++++++++
.../datagen/source/DataGeneratorSourceTest.java | 91 +++++---
.../src/test/resources/archunit.properties | 31 +++
.../src/test/resources/log4j2-test.properties | 28 +++
.../file/src/FileSourceHeavyThroughputTest.java | 5 +
flink-connectors/pom.xml | 1 +
.../apache/flink/api/connector/source/Source.java | 16 +-
.../api/connector/source/SourceReaderContext.java | 9 +
.../api/connector/source/SourceReaderFactory.java | 42 ++++
.../source/lib/util/IteratorSourceReader.java | 134 +----------
...ceReader.java => IteratorSourceReaderBase.java} | 24 +-
.../source/util/ratelimit/GatedRateLimiter.java | 68 ++++++
.../source/util/ratelimit/GuavaRateLimiter.java | 48 ++++
.../source/util/ratelimit/NoOpRateLimiter.java | 34 +++
.../util/ratelimit/RateLimitedSourceReader.java | 110 +++++++++
.../source/util/ratelimit/RateLimiter.java | 47 ++++
.../source/util/ratelimit/RateLimiterStrategy.java | 73 ++++++
.../source/lib/NumberSequenceSourceTest.java | 5 +
flink-examples/flink-examples-streaming/pom.xml | 6 +
.../streaming/examples/datagen/DataGenerator.java | 51 +++++
.../datagen/DataGeneratorPerCheckpoint.java | 56 +++++
.../source/datagen/DataGeneratorSource.java | 3 +
.../streaming/api/operators/SourceOperator.java | 5 +
.../source/reader/TestingReaderContext.java | 5 +
flink-tests/pom.xml | 6 +
.../source/lib/util/GatedRateLimiterTest.java | 53 +++++
.../lib/util/RateLimitedSourceReaderITCase.java | 119 ++++++++++
pom.xml | 1 +
43 files changed, 1893 insertions(+), 182 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/datagen.md
b/docs/content.zh/docs/connectors/datastream/datagen.md
new file mode 100644
index 00000000000..af3129357ed
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/datagen.md
@@ -0,0 +1,104 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for
generating input data for
+Flink pipelines.
+It is useful when developing locally or demoing without access to external
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-----
+
+The `DataGeneratorSource` produces N data points in parallel. The source
splits the sequence
+into as many parallel sub-sequences as there are parallel source subtasks. It
drives the data
+generation process by supplying "index" values of type `Long` to the
user-provided
+{{< javadoc name="GeneratorFunction"
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of `Long`
values
+into the generated events of an arbitrary data type. For instance, the
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction<Long, String> generatorFunction = index -> "Number: " +
index;
+long numberOfRecords = 1000;
+
+DataGeneratorSource<String> source =
+ new DataGeneratorSource<>(generatorFunction, numberOfRecords,
Types.STRING);
+
+DataStreamSource<String> stream =
+ env.fromSource(source,
+ WatermarkStrategy.noWatermarks(),
+ "Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+Rate Limiting
+-----
+
+`DataGeneratorSource` has built-in support for rate limiting. The following
code will produce a stream of
+`Long` values at the overall source rate (across all source subtasks) not
exceeding 100 events per second.
+
+```java
+GeneratorFunction<Long, Long> generatorFunction = index -> index;
+double recordsPerSecond = 100;
+
+DataGeneratorSource<String> source =
+ new DataGeneratorSource<>(
+ generatorFunction,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(recordsPerSecond),
+ Types.STRING);
+```
+
+Additional rate limiting strategies, such as limiting the number of records
emitted per checkpoint, can
+be found in {{< javadoc name="RateLimiterStrategy"
file="org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.html">}}.
+
+Boundedness
+-----
+This source is always bounded. From a practical perspective, however, setting
the number of records
+to `Long.MAX_VALUE` turns it into an effectively unbounded source (the end
will never be reached). For finite sequences users may want to consider running
the application in [`BATCH` execution mode]({{< ref
"docs/dev/datastream/execution_mode"
>}}#when-canshould-i-use-batch-execution-mode)
+.
+
+Notes
+-----
+
+{{< hint info >}}
+**Note:** `DataGeneratorSource` can be used to implement Flink jobs with
at-least-once and
+end-to-end exactly-once processing guarantees under the condition that the
output of the `GeneratorFunction`
+is deterministic with respect to its input, in other words supplying the same
`Long` number always
+leads to generating the same output.
+{{< /hint >}}
+
+{{< hint info >}}
+**Note:** it is possible to also produce deterministic watermarks right at the
+source based on the generated events and a custom {{< javadoc
name="WatermarkStrategy"
file="org/apache/flink/api/common/eventtime/WatermarkStrategy.html">}}.
+{{< /hint >}}
+
+
+
diff --git a/docs/content.zh/docs/connectors/datastream/overview.md
b/docs/content.zh/docs/connectors/datastream/overview.md
index d394302cf56..9c6ffe0af38 100644
--- a/docs/content.zh/docs/connectors/datastream/overview.md
+++ b/docs/content.zh/docs/connectors/datastream/overview.md
@@ -41,6 +41,7 @@ under the License.
* [Apache Cassandra]({{< ref "docs/connectors/datastream/cassandra" >}})
(sink)
* [Amazon Kinesis Data Streams]({{< ref "docs/connectors/datastream/kinesis"
>}}) (source/sink)
* [Amazon Kinesis Data Firehose]({{< ref
"docs/connectors/datastream/firehose" >}}) (sink)
+ * [DataGen]({{< ref "docs/connectors/datastream/datagen" >}}) (source)
* [Elasticsearch]({{< ref "docs/connectors/datastream/elasticsearch" >}})
(sink)
* [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) (sink)
* [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink)
diff --git a/docs/content/docs/connectors/datastream/datagen.md
b/docs/content/docs/connectors/datastream/datagen.md
new file mode 100644
index 00000000000..3847982550e
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/datagen.md
@@ -0,0 +1,104 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for
generating input data for
+Flink pipelines.
+It is useful when developing locally or demoing without access to external
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-----
+
+The `DataGeneratorSource` produces N data points in parallel. The source
splits the sequence
+into as many parallel sub-sequences as there are parallel source subtasks. It
drives the data
+generation process by supplying "index" values of type `Long` to the
user-provided
+{{< javadoc name="GeneratorFunction"
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of `Long`
values
+into the generated events of an arbitrary data type. For instance, the
following code will produce the sequence of
+`["Number: 0", "Number: 1", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction<Long, String> generatorFunction = index -> "Number: " +
index;
+long numberOfRecords = 1000;
+
+DataGeneratorSource<String> source =
+ new DataGeneratorSource<>(generatorFunction, numberOfRecords,
Types.STRING);
+
+DataStreamSource<String> stream =
+ env.fromSource(source,
+ WatermarkStrategy.noWatermarks(),
+ "Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+Rate Limiting
+-----
+
+`DataGeneratorSource` has built-in support for rate limiting. The following
code will produce a stream of
+`Long` values at the overall source rate (across all source subtasks) not
exceeding 100 events per second.
+
+```java
+GeneratorFunction<Long, Long> generatorFunction = index -> index;
+double recordsPerSecond = 100;
+
+DataGeneratorSource<String> source =
+ new DataGeneratorSource<>(
+ generatorFunction,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(recordsPerSecond),
+ Types.STRING);
+```
+
+Additional rate limiting strategies, such as limiting the number of records
emitted per checkpoint, can
+be found in {{< javadoc name="RateLimiterStrategy"
file="org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.html">}}.
+
+Boundedness
+-----
+This source is always bounded. From a practical perspective, however, setting
the number of records
+to `Long.MAX_VALUE` turns it into an effectively unbounded source (the end
will never be reached). For finite sequences users may want to consider running
the application in [`BATCH` execution mode]({{< ref
"docs/dev/datastream/execution_mode"
>}}#when-canshould-i-use-batch-execution-mode)
+.
+
+Notes
+-----
+
+{{< hint info >}}
+**Note:** `DataGeneratorSource` can be used to implement Flink jobs with
at-least-once and
+end-to-end exactly-once processing guarantees under the condition that the
output of the `GeneratorFunction`
+is deterministic with respect to its input, in other words supplying the same
`Long` number always
+leads to generating the same output.
+{{< /hint >}}
+
+{{< hint info >}}
+**Note:** it is possible to also produce deterministic watermarks right at the
+source based on the generated events and a custom {{< javadoc
name="WatermarkStrategy"
file="org/apache/flink/api/common/eventtime/WatermarkStrategy.html">}}.
+{{< /hint >}}
+
+
+
diff --git a/docs/content/docs/connectors/datastream/overview.md
b/docs/content/docs/connectors/datastream/overview.md
index 8c6707b6095..65e61abd0f7 100644
--- a/docs/content/docs/connectors/datastream/overview.md
+++ b/docs/content/docs/connectors/datastream/overview.md
@@ -42,6 +42,7 @@ Connectors provide code for interfacing with various
third-party systems. Curren
* [Apache Cassandra]({{< ref "docs/connectors/datastream/cassandra" >}})
(sink)
* [Amazon Kinesis Data Streams]({{< ref "docs/connectors/datastream/kinesis"
>}}) (source/sink)
* [Amazon Kinesis Data Firehose]({{< ref
"docs/connectors/datastream/firehose" >}}) (sink)
+ * [DataGen]({{< ref "docs/connectors/datastream/datagen" >}}) (source)
* [Elasticsearch]({{< ref "docs/connectors/datastream/elasticsearch" >}})
(sink)
* [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}})
(source/sink)
* [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink)
diff --git
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/18509c9e-3250-4c52-91b9-11ccefc85db1
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/18509c9e-3250-4c52-91b9-11ccefc85db1
index 3a09bcb8547..e2812cfad3a 100644
---
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/18509c9e-3250-4c52-91b9-11ccefc85db1
+++
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/18509c9e-3250-4c52-91b9-11ccefc85db1
@@ -43,7 +43,7 @@
org.apache.flink.api.connector.source.lib.NumberSequenceSource.getSplitSerialize
org.apache.flink.api.connector.source.lib.NumberSequenceSource.getSplitSerializer():
Returned leaf type org.apache.flink.core.io.SimpleVersionedSerializer does not
satisfy: reside outside of package 'org.apache.flink..' or reside in any
package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.connector.source.lib.NumberSequenceSource.restoreEnumerator(org.apache.flink.api.connector.source.SplitEnumeratorContext,
java.util.Collection): Argument leaf type
org.apache.flink.api.connector.source.lib.NumberSequenceSource$NumberSequenceSplit
does not satisfy: reside outside of package 'org.apache.flink..' or reside in
any package ['..shaded..'] or annotated with @Public or annotated with
@Deprecated
org.apache.flink.api.connector.source.lib.NumberSequenceSource.restoreEnumerator(org.apache.flink.api.connector.source.SplitEnumeratorContext,
java.util.Collection): Returned leaf type
org.apache.flink.api.connector.source.lib.NumberSequenceSource$NumberSequenceSplit
does not satisfy: reside outside of package 'org.apache.flink..' or reside in
any package ['..shaded..'] or annotated with @Public or annotated with
@Deprecated
-org.apache.flink.api.connector.source.lib.util.IteratorSourceReader.pollNext(org.apache.flink.api.connector.source.ReaderOutput):
Returned leaf type org.apache.flink.core.io.InputStatus does not satisfy:
reside outside of package 'org.apache.flink..' or reside in any package
['..shaded..'] or annotated with @Public or annotated with @Deprecated
+org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase.pollNext(org.apache.flink.api.connector.source.ReaderOutput):
Returned leaf type org.apache.flink.core.io.InputStatus does not satisfy:
reside outside of package 'org.apache.flink..' or reside in any package
['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.java.DataSet.coGroup(org.apache.flink.api.java.DataSet):
Returned leaf type
org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets does
not satisfy: reside outside of package 'org.apache.flink..' or reside in any
package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.java.DataSet.write(org.apache.flink.api.common.io.FileOutputFormat,
java.lang.String, org.apache.flink.core.fs.FileSystem$WriteMode): Argument
leaf type org.apache.flink.core.fs.FileSystem$WriteMode does not satisfy:
reside outside of package 'org.apache.flink..' or reside in any package
['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.java.DataSet.writeAsCsv(java.lang.String,
java.lang.String, java.lang.String,
org.apache.flink.core.fs.FileSystem$WriteMode): Argument leaf type
org.apache.flink.core.fs.FileSystem$WriteMode does not satisfy: reside outside
of package 'org.apache.flink..' or reside in any package ['..shaded..'] or
annotated with @Public or annotated with @Deprecated
diff --git
a/flink-connectors/flink-connector-datagen/archunit-violations/0fbe3123-5829-4891-93a5-a99bd8413fd9
b/flink-connectors/flink-connector-datagen/archunit-violations/0fbe3123-5829-4891-93a5-a99bd8413fd9
new file mode 100644
index 00000000000..e69de29bb2d
diff --git
a/flink-connectors/flink-connector-datagen/archunit-violations/5a661a23-5b47-407c-9994-b6215a46c45c
b/flink-connectors/flink-connector-datagen/archunit-violations/5a661a23-5b47-407c-9994-b6215a46c45c
new file mode 100644
index 00000000000..e69de29bb2d
diff --git
a/flink-connectors/flink-connector-datagen/archunit-violations/stored.rules
b/flink-connectors/flink-connector-datagen/archunit-violations/stored.rules
new file mode 100644
index 00000000000..687f775c531
--- /dev/null
+++ b/flink-connectors/flink-connector-datagen/archunit-violations/stored.rules
@@ -0,0 +1,4 @@
+#
+#Sun Oct 09 19:59:56 CEST 2022
+ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\
extension=5a661a23-5b47-407c-9994-b6215a46c45c
+Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\
ITCase=0fbe3123-5829-4891-93a5-a99bd8413fd9
diff --git a/flink-connectors/flink-connector-datagen/pom.xml
b/flink-connectors/flink-connector-datagen/pom.xml
new file mode 100644
index 00000000000..1d70814417f
--- /dev/null
+++ b/flink-connectors/flink-connector-datagen/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connectors</artifactId>
+ <version>1.17-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-connector-datagen</artifactId>
+ <name>Flink : Connectors : Datagen</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependency -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+
+ <!-- ArchUit test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-architecture-tests-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
new file mode 100644
index 00000000000..8f128414efb
--- /dev/null
+++
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceReaderFactory;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * <p>The source splits the sequence into as many parallel sub-sequences as
there are parallel
+ * source readers.
+ *
+ * <p>Users can supply a {@code GeneratorFunction} for mapping the
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce
the sequence of
+ * ["Number: 0", "Number: 1", ... , "Number: 999"] elements.
+ *
+ * <pre>{@code
+ * GeneratorFunction<Long, String> generatorFunction = index -> "Number: " +
index;
+ *
+ * DataGeneratorSource<String> source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource<String> stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }</pre>
+ *
+ * <p>The order of elements depends on the parallelism. Each sub-sequence will
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * <p>Note that this approach also makes it possible to produce deterministic
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * <p>This source has built-in support for rate limiting. The following code
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100
events per second.
+ *
+ * <pre>{@code
+ * GeneratorFunction<Long, Long> generatorFunction = index -> index;
+ *
+ * DataGeneratorSource<String> source =
+ * new DataGeneratorSource<>(
+ * generatorFunctionStateless,
+ * Long.MAX_VALUE,
+ * RateLimiterStrategy.perSecond(100),
+ * Types.STRING);
+ * }</pre>
+ *
+ * <p>This source is always bounded. For very long sequences (for example when
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, the
end bound is pretty
+ * far away.
+ */
+@Experimental
+public class DataGeneratorSource<OUT>
+ implements Source<OUT, NumberSequenceSplit,
Collection<NumberSequenceSplit>>,
+ ResultTypeQueryable<OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final SourceReaderFactory<OUT, NumberSequenceSplit>
sourceReaderFactory;
+ private final TypeInformation<OUT> typeInfo;
+
+ private final NumberSequenceSource numberSource;
+
+ /**
+ * Instantiates a new {@code DataGeneratorSource}.
+ *
+ * @param generatorFunction The {@code GeneratorFunction} function.
+ * @param count The number of generated data points.
+ * @param typeInfo The type of the produced data points.
+ */
+ public DataGeneratorSource(
+ GeneratorFunction<Long, OUT> generatorFunction,
+ long count,
+ TypeInformation<OUT> typeInfo) {
+ this(generatorFunction, count, RateLimiterStrategy.noOp(), typeInfo);
+ }
+
+ /**
+ * Instantiates a new {@code DataGeneratorSource}.
+ *
+ * @param generatorFunction The {@code GeneratorFunction} function.
+ * @param count The number of generated data points.
+ * @param rateLimiterStrategy The strategy for rate limiting.
+ * @param typeInfo The type of the produced data points.
+ */
+ public DataGeneratorSource(
+ GeneratorFunction<Long, OUT> generatorFunction,
+ long count,
+ RateLimiterStrategy rateLimiterStrategy,
+ TypeInformation<OUT> typeInfo) {
+ this(
+ new GeneratorSourceReaderFactory<>(generatorFunction,
rateLimiterStrategy),
+ count,
+ typeInfo);
+ ClosureCleaner.clean(
+ generatorFunction,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ ClosureCleaner.clean(
+ rateLimiterStrategy,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ }
+
+ private DataGeneratorSource(
+ SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory,
+ long count,
+ TypeInformation<OUT> typeInfo) {
+ this.sourceReaderFactory = checkNotNull(sourceReaderFactory);
+ ClosureCleaner.clean(
+ sourceReaderFactory,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ this.typeInfo = checkNotNull(typeInfo);
+ this.numberSource = new NumberSequenceSource(0, count - 1);
+ }
+
+ // ------------------------------------------------------------------------
+ // source methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return typeInfo;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceReader<OUT, NumberSequenceSplit>
createReader(SourceReaderContext readerContext)
+ throws Exception {
+ return sourceReaderFactory.createReader(readerContext);
+ }
+
+ @Override
+ public SplitEnumerator<NumberSequenceSplit,
Collection<NumberSequenceSplit>> restoreEnumerator(
+ SplitEnumeratorContext<NumberSequenceSplit> enumContext,
+ Collection<NumberSequenceSplit> checkpoint) {
+ return numberSource.restoreEnumerator(enumContext, checkpoint);
+ }
+
+ @Override
+ public SplitEnumerator<NumberSequenceSplit,
Collection<NumberSequenceSplit>> createEnumerator(
+ final SplitEnumeratorContext<NumberSequenceSplit> enumContext) {
+ return numberSource.createEnumerator(enumContext);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<NumberSequenceSplit> getSplitSerializer()
{
+ return numberSource.getSplitSerializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Collection<NumberSequenceSplit>>
+ getEnumeratorCheckpointSerializer() {
+ return numberSource.getEnumeratorCheckpointSerializer();
+ }
+}
diff --git
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratingIteratorSourceReader.java
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratingIteratorSourceReader.java
new file mode 100644
index 00000000000..b46f93c6b83
--- /dev/null
+++
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratingIteratorSourceReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Iterator;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@code SourceReader} that takes the values of an iterator, supplied via
an {@link
+ * IteratorSourceSplit}, and applies a {@link GeneratorFunction} to them to
perform arbitrary
+ * transformations.
+ */
+@Experimental
+public class GeneratingIteratorSourceReader<
+ E, O, IterT extends Iterator<E>, SplitT extends
IteratorSourceSplit<E, IterT>>
+ extends IteratorSourceReaderBase<E, O, IterT, SplitT> {
+
+ private final GeneratorFunction<E, O> generatorFunction;
+
+ public GeneratingIteratorSourceReader(
+ SourceReaderContext context, GeneratorFunction<E, O>
generatorFunction) {
+ super(context);
+ this.generatorFunction = checkNotNull(generatorFunction);
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ protected O convert(E value) {
+ try {
+ return generatorFunction.map(value);
+ } catch (Exception e) {
+ String message =
+ String.format(
+ "A user-provided generator function threw an
exception on this input: %s",
+ value.toString());
+ throw new FlinkRuntimeException(message, e);
+ }
+ }
+
+ @Override
+ public void start(SourceReaderContext context) {
+ try {
+ generatorFunction.open(context);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Failed to open the
GeneratorFunction", e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ generatorFunction.close();
+ super.close();
+ }
+}
diff --git
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorFunction.java
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorFunction.java
new file mode 100644
index 00000000000..096be1fe277
--- /dev/null
+++
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+
+/**
+ * Base interface for data generator functions. Data generator functions take
elements and transform
+ * them, element-wise. They are the core building block of the {@link
DataGeneratorSource} that
+ * drives the data generation process by supplying "index" values of type
Long. It makes it possible
+ * to produce specific elements at concrete positions of the generated data
stream.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * GeneratorFunction<Long, String> generatorFunction = index -> "Number: " +
index;
+ * DataGeneratorSource<String> source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ * }</pre>
+ *
+ * @param <T> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+@Experimental
+public interface GeneratorFunction<T, O> extends Function {
+
+ /**
+ * Initialization method for the function. It is called once before the
actual data mapping
+ * methods.
+ */
+ default void open(SourceReaderContext readerContext) throws Exception {}
+
+ /** Tear-down method for the function. */
+ default void close() throws Exception {}
+
+ O map(T value) throws Exception;
+}
diff --git
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java
new file mode 100644
index 00000000000..f3888d3d1de
--- /dev/null
+++
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import
org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
+import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A factory for instantiating source readers that produce elements by
applying a user-supplied
+ * {@link GeneratorFunction}.
+ *
+ * @param <OUT> The type of the output elements.
+ */
+@Internal
+class GeneratorSourceReaderFactory<OUT>
+ implements SourceReaderFactory<OUT,
NumberSequenceSource.NumberSequenceSplit> {
+
+ private final GeneratorFunction<Long, OUT> generatorFunction;
+ private final RateLimiterStrategy rateLimiterStrategy;
+
+ /**
+ * Instantiates a new {@code GeneratorSourceReaderFactory}.
+ *
+ * @param generatorFunction The generator function.
+ * @param rateLimiterStrategy The rate limiter strategy.
+ */
+ public GeneratorSourceReaderFactory(
+ GeneratorFunction<Long, OUT> generatorFunction,
+ RateLimiterStrategy rateLimiterStrategy) {
+ this.generatorFunction = checkNotNull(generatorFunction);
+ this.rateLimiterStrategy = checkNotNull(rateLimiterStrategy);
+ }
+
+ @Override
+ public SourceReader<OUT, NumberSequenceSource.NumberSequenceSplit>
createReader(
+ SourceReaderContext readerContext) {
+ int parallelism = readerContext.currentParallelism();
+ RateLimiter rateLimiter =
rateLimiterStrategy.createRateLimiter(parallelism);
+ return new RateLimitedSourceReader<>(
+ new GeneratingIteratorSourceReader<>(readerContext,
generatorFunction),
+ rateLimiter);
+ }
+}
diff --git
a/flink-connectors/flink-connector-datagen/src/main/resources/log4j2.properties
b/flink-connectors/flink-connector-datagen/src/main/resources/log4j2.properties
new file mode 100644
index 00000000000..c64a340a8dc
--- /dev/null
+++
b/flink-connectors/flink-connector-datagen/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = OFF
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git
a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
new file mode 100644
index 00000000000..96669b8ac82
--- /dev/null
+++
b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.architecture;
+
+import org.apache.flink.architecture.common.ImportOptions;
+
+import com.tngtech.archunit.core.importer.ImportOption;
+import com.tngtech.archunit.junit.AnalyzeClasses;
+import com.tngtech.archunit.junit.ArchTest;
+import com.tngtech.archunit.junit.ArchTests;
+
+/** Architecture tests for test code. */
+@AnalyzeClasses(
+ packages = "org.apache.flink.connector.datagen",
+ importOptions = {
+ ImportOption.OnlyIncludeTests.class,
+ ImportOptions.ExcludeScalaImportOption.class,
+ ImportOptions.ExcludeShadedImportOption.class
+ })
+public class TestCodeArchitectureTest {
+
+ @ArchTest
+ public static final ArchTests COMMON_TESTS =
ArchTests.in(TestCodeArchitectureTestBase.class);
+}
diff --git
a/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
new file mode 100644
index 00000000000..76f5eca8494
--- /dev/null
+++
b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static java.util.stream.Collectors.summingInt;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** An integration test for {@code DataGeneratorSource}. */
+class DataGeneratorSourceITCase extends TestLogger {
+
+ private static final int PARALLELISM = 4;
+
+ @RegisterExtension
+ private static final MiniClusterExtension miniClusterExtension =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .build());
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ @DisplayName("Combined results of parallel source readers produce the
expected sequence.")
+ void testParallelSourceExecution() throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ final DataStream<Long> stream = getGeneratorSourceStream(index ->
index, env, 1_000L);
+
+ final List<Long> result = stream.executeAndCollect(10000);
+
+ assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999));
+ }
+
+ @Test
+ @DisplayName("Generator function can be instantiated as an anonymous
class.")
+ void testParallelSourceExecutionWithAnonymousClass() throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ GeneratorFunction<Long, Long> generatorFunction =
+ new GeneratorFunction<Long, Long>() {
+
+ @Override
+ public Long map(Long value) {
+ return value;
+ }
+ };
+
+ final DataStream<Long> stream =
getGeneratorSourceStream(generatorFunction, env, 1_000L);
+
+ final List<Long> result = stream.executeAndCollect(10000);
+
+ assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999));
+ }
+
+ @Test
+ @DisplayName("Exceptions from the generator function are not 'swallowed'.")
+ void testFailingGeneratorFunction() throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ GeneratorFunction<Long, Long> generatorFunction =
+ value -> {
+ throw new Exception("boom");
+ };
+
+ final DataStream<Long> stream =
getGeneratorSourceStream(generatorFunction, env, 1_000L);
+
+ assertThatThrownBy(
+ () -> {
+ stream.executeAndCollect(10000);
+ })
+ .satisfies(anyCauseMatches("exception on this input:"))
+ .satisfies(anyCauseMatches("boom"));
+ }
+
+ @Test
+ @DisplayName("Exceptions from the generator function initialization are
not 'swallowed'.")
+ // FIX_ME: failure details are swallowed by Flink
+ // Full details are still available at this line:
+ //
https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L758
+ // But the execution falls through to the line below and discards the root
cause of
+ // cancelling the source invokable without recording it:
+ //
https://github.com/apache/flink/blob/bccecc23067eb7f18e20bade814be73393401be5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L780
+ @Disabled
+ void testFailingGeneratorFunctionInitialization() throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ GeneratorFunction<Long, Long> generatorFunctionFailingInit =
+ new GeneratorFunction<Long, Long>() {
+ @Override
+ public void open(SourceReaderContext readerContext) throws
Exception {
+ throw new Exception("boom");
+ }
+
+ @Override
+ public Long map(Long value) {
+ return value;
+ }
+ };
+
+ final DataStream<Long> stream =
+ getGeneratorSourceStream(generatorFunctionFailingInit, env,
1_000L);
+
+ assertThatThrownBy(
+ () -> {
+ stream.executeAndCollect(10000);
+ })
+ .satisfies(anyCauseMatches("Failed to open"))
+ .satisfies(anyCauseMatches("boom"));
+ }
+
+ @Test
+ @DisplayName(
+ "Result is correct when less elements are expected than the number
of parallel source readers")
+ void testLessSplitsThanParallelism() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ int n = PARALLELISM - 2;
+ DataStream<Long> stream = getGeneratorSourceStream(index -> index,
env, n).map(l -> l);
+
+ List<Long> result = stream.executeAndCollect(100);
+
+ assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, n -
1));
+ }
+
+ @Test
+ @DisplayName("Test GatedRateLimiter")
+ void testGatedRateLimiter() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(100);
+
+ env.setParallelism(PARALLELISM);
+
+ int capacityPerSubtaskPerCycle = 2;
+ int capacityPerCycle = // avoid rounding errors when spreading records
among subtasks
+ PARALLELISM * capacityPerSubtaskPerCycle;
+
+ final GeneratorFunction<Long, Long> generatorFunction = index -> 1L;
+
+ // Allow each subtask to produce at least 3 cycles, gated by
checkpoints
+ int count = capacityPerCycle * 3;
+ final DataGeneratorSource<Long> generatorSource =
+ new DataGeneratorSource<>(
+ generatorFunction,
+ count,
+ RateLimiterStrategy.perCheckpoint(capacityPerCycle),
+ Types.LONG);
+
+ final DataStreamSource<Long> streamSource =
+ env.fromSource(generatorSource,
WatermarkStrategy.noWatermarks(), "Data Generator");
+ final DataStream<Tuple2<Integer, Long>> map =
+ streamSource.map(new SubtaskAndCheckpointMapper());
+ final List<Tuple2<Integer, Long>> results =
map.executeAndCollect(1000);
+
+ final Map<Tuple2<Integer, Long>, Integer> collect =
+ results.stream()
+ .collect(
+ Collectors.groupingBy(
+ x -> (new Tuple2<>(x.f0, x.f1)),
summingInt(x -> 1)));
+ for (Map.Entry<Tuple2<Integer, Long>, Integer> entry :
collect.entrySet()) {
+ assertThat(entry.getValue()).isEqualTo(capacityPerSubtaskPerCycle);
+ }
+ }
+
+ private static class SubtaskAndCheckpointMapper
+ extends RichMapFunction<Long, Tuple2<Integer, Long>> implements
CheckpointListener {
+
+ private long checkpointId = 0;
+ private int subtaskIndex;
+
+ @Override
+ public void open(Configuration parameters) {
+ subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+ }
+
+ @Override
+ public Tuple2<Integer, Long> map(Long value) {
+ return new Tuple2<>(subtaskIndex, checkpointId);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ this.checkpointId = checkpointId;
+ }
+ }
+
+ private DataStream<Long> getGeneratorSourceStream(
+ GeneratorFunction<Long, Long> generatorFunction,
+ StreamExecutionEnvironment env,
+ long count) {
+ DataGeneratorSource<Long> dataGeneratorSource =
+ new DataGeneratorSource<>(generatorFunction, count,
Types.LONG);
+
+ return env.fromSource(
+ dataGeneratorSource, WatermarkStrategy.noWatermarks(),
"generator source");
+ }
+
+ private List<Long> range(int startInclusive, int endInclusive) {
+ return LongStream.rangeClosed(startInclusive, endInclusive)
+ .boxed()
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java
similarity index 63%
copy from
flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
copy to
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java
index 5944201d1ad..2caf93f6e08 100644
---
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
+++
b/flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java
@@ -16,14 +16,18 @@
* limitations under the License.
*/
-package org.apache.flink.api.connector.source.lib;
+package org.apache.flink.connector.datagen.source;
import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
@@ -31,19 +35,57 @@ import
org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.SimpleUserCodeClassLoader;
import org.apache.flink.util.UserCodeClassLoader;
-import org.junit.Test;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
+import java.util.stream.LongStream;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for the {@link NumberSequenceSource}. */
-public class NumberSequenceSourceTest {
+/** Tests for the {@link DataGeneratorSource}. */
+class DataGeneratorSourceTest {
@Test
- public void testReaderCheckpoints() throws Exception {
+ @DisplayName("Correctly restores SplitEnumerator from a snapshot.")
+ void testRestoreEnumerator() throws Exception {
+ final GeneratorFunction<Long, Long> generatorFunctionStateless = index
-> index;
+ final DataGeneratorSource<Long> dataGeneratorSource =
+ new DataGeneratorSource<>(generatorFunctionStateless, 100,
Types.LONG);
+
+ final int parallelism = 2;
+ final
MockSplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> context =
+ new MockSplitEnumeratorContext<>(parallelism);
+
+ SplitEnumerator<
+ NumberSequenceSource.NumberSequenceSplit,
+ Collection<NumberSequenceSource.NumberSequenceSplit>>
+ enumerator = dataGeneratorSource.createEnumerator(context);
+
+ // start() is not strictly necessary in the current implementation,
but should logically be
+ // executed in this order (protect against any breaking changes in the
start() method).
+ enumerator.start();
+
+ Collection<NumberSequenceSource.NumberSequenceSplit> enumeratorState =
+ enumerator.snapshotState(0);
+ assertThat(enumeratorState).hasSize(parallelism);
+
+ enumerator = dataGeneratorSource.restoreEnumerator(context,
enumeratorState);
+
+ // Verify that splits were restored and can be assigned
+ assertThat(context.getSplitsAssignmentSequence()).isEmpty();
+ for (NumberSequenceSource.NumberSequenceSplit ignored :
enumeratorState) {
+ enumerator.handleSplitRequest(0, "hostname");
+ }
+
assertThat(context.getSplitsAssignmentSequence()).hasSize(enumeratorState.size());
+ }
+
+ @Test
+ @DisplayName("Uses the underlying NumberSequenceSource correctly for
checkpointing.")
+ void testReaderCheckpoints() throws Exception {
final long from = 177;
final long mid = 333;
final long to = 563;
@@ -75,33 +117,19 @@ public class NumberSequenceSourceTest {
}
final List<Long> result = out.getEmittedRecords();
- validateSequence(result, from, to);
- }
-
- private static void validateSequence(
- final List<Long> sequence, final long from, final long to) {
- if (sequence.size() != to - from + 1) {
- failSequence(sequence, from, to);
- }
+ final Iterable<Long> expected = LongStream.range(from, to +
1)::iterator;
- long nextExpected = from;
- for (Long next : sequence) {
- if (next != nextExpected++) {
- failSequence(sequence, from, to);
- }
- }
- }
-
- private static void failSequence(final List<Long> sequence, final long
from, final long to) {
- fail(
- String.format(
- "Expected: A sequence [%d, %d], but found: sequence
(size %d) : %s",
- from, to, sequence.size(), sequence));
+ assertThat(result).containsExactlyElementsOf(expected);
}
- private static SourceReader<Long,
NumberSequenceSource.NumberSequenceSplit> createReader() {
+ private static SourceReader<Long,
NumberSequenceSource.NumberSequenceSplit> createReader()
+ throws Exception {
// the arguments passed in the source constructor matter only to the
enumerator
- return new NumberSequenceSource(0L, 0L).createReader(new
DummyReaderContext());
+ GeneratorFunction<Long, Long> generatorFunctionStateless = index ->
index;
+ DataGeneratorSource<Long> dataGeneratorSource =
+ new DataGeneratorSource<>(generatorFunctionStateless,
Long.MAX_VALUE, Types.LONG);
+
+ return dataGeneratorSource.createReader(new DummyReaderContext());
}
// ------------------------------------------------------------------------
@@ -143,6 +171,11 @@ public class NumberSequenceSourceTest {
public UserCodeClassLoader getUserCodeClassLoader() {
return
SimpleUserCodeClassLoader.create(getClass().getClassLoader());
}
+
+ @Override
+ public int currentParallelism() {
+ return 1;
+ }
}
private static final class TestingReaderOutput<E> implements
ReaderOutput<E> {
diff --git
a/flink-connectors/flink-connector-datagen/src/test/resources/archunit.properties
b/flink-connectors/flink-connector-datagen/src/test/resources/archunit.properties
new file mode 100644
index 00000000000..15be88c95ba
--- /dev/null
+++
b/flink-connectors/flink-connector-datagen/src/test/resources/archunit.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# By default we allow removing existing violations, but fail when new
violations are added.
+freeze.store.default.allowStoreUpdate=true
+
+# Enable this if a new (frozen) rule has been added in order to create the
initial store and record the existing violations.
+#freeze.store.default.allowStoreCreation=true
+
+# Enable this to add allow new violations to be recorded.
+# NOTE: Adding new violations should be avoided when possible. If the rule was
correct to flag a new
+# violation, please try to avoid creating the violation. If the
violation was created due to a
+# shortcoming of the rule, file a JIRA issue so the rule can be improved.
+#freeze.refreeze=true
+
+freeze.store.default.path=archunit-violations
diff --git
a/flink-connectors/flink-connector-datagen/src/test/resources/log4j2-test.properties
b/flink-connectors/flink-connector-datagen/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000000..c4fa18706ff
--- /dev/null
+++
b/flink-connectors/flink-connector-datagen/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
index 3c175a80c8d..bb44462ab05 100644
---
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
@@ -225,6 +225,11 @@ class FileSourceHeavyThroughputTest {
public UserCodeClassLoader getUserCodeClassLoader() {
return
SimpleUserCodeClassLoader.create(getClass().getClassLoader());
}
+
+ @Override
+ public int currentParallelism() {
+ return 1;
+ }
}
private static final class NoOpReaderOutput<E> implements ReaderOutput<E> {
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 6c2a814430b..f05ff1e1b7c 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -53,6 +53,7 @@ under the License.
<module>flink-file-sink-common</module>
<module>flink-connector-files</module>
<module>flink-connector-pulsar</module>
+ <module>flink-connector-datagen</module>
</modules>
<!-- override these root dependencies as 'provided', so they don't end
up
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
index 899469f00ed..096e0d67376 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
@@ -21,8 +21,6 @@ package org.apache.flink.api.connector.source;
import org.apache.flink.annotation.Public;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import java.io.Serializable;
-
/**
* The interface for Source. It acts like a factory class that helps construct
the {@link
* SplitEnumerator} and {@link SourceReader} and corresponding serializers.
@@ -32,7 +30,8 @@ import java.io.Serializable;
* @param <EnumChkT> The type of the enumerator checkpoints.
*/
@Public
-public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends
Serializable {
+public interface Source<T, SplitT extends SourceSplit, EnumChkT>
+ extends SourceReaderFactory<T, SplitT> {
/**
* Get the boundedness of this source.
@@ -41,17 +40,6 @@ public interface Source<T, SplitT extends SourceSplit,
EnumChkT> extends Seriali
*/
Boundedness getBoundedness();
- /**
- * Creates a new reader to read data from the splits it gets assigned. The
reader starts fresh
- * and does not have any state to resume.
- *
- * @param readerContext The {@link SourceReaderContext context} for the
source reader.
- * @return A new SourceReader.
- * @throws Exception The implementor is free to forward all exceptions
directly. Exceptions
- * thrown from this method cause task failure/recovery.
- */
- SourceReader<T, SplitT> createReader(SourceReaderContext readerContext)
throws Exception;
-
/**
* Creates a new SplitEnumerator for this source, starting a new input.
*
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index 0d25ec6bb3d..08c64501d9a 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -63,4 +63,13 @@ public interface SourceReaderContext {
* @see UserCodeClassLoader
*/
UserCodeClassLoader getUserCodeClassLoader();
+
+ /**
+ * Get the current parallelism of this Source.
+ *
+ * @return the parallelism of the Source.
+ */
+ default int currentParallelism() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderFactory.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderFactory.java
new file mode 100644
index 00000000000..e974def7b03
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.Public;
+
+import java.io.Serializable;
+
+/**
+ * A factory for creating source reader instances.
+ *
+ * @param <T> The type of the output elements.
+ */
+@Public
+public interface SourceReaderFactory<T, SplitT extends SourceSplit> extends
Serializable {
+ /**
+ * Creates a new reader to read data from the splits it gets assigned. The
reader starts fresh
+ * and does not have any state to resume.
+ *
+ * @param readerContext The {@link SourceReaderContext context} for the
source reader.
+ * @return A new SourceReader.
+ * @throws Exception The implementor is free to forward all exceptions
directly. Exceptions
+ * thrown from this method cause task failure/recovery.
+ */
+ SourceReader<T, SplitT> createReader(SourceReaderContext readerContext)
throws Exception;
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
index d7a63c06a18..f6a567e90b9 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
@@ -19,22 +19,10 @@
package org.apache.flink.api.connector.source.lib.util;
import org.apache.flink.annotation.Public;
-import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.core.io.InputStatus;
-import javax.annotation.Nullable;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A {@link SourceReader} that returns the values of an iterator, supplied via
an {@link
@@ -52,128 +40,14 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
@Public
public class IteratorSourceReader<
E, IterT extends Iterator<E>, SplitT extends
IteratorSourceSplit<E, IterT>>
- implements SourceReader<E, SplitT> {
-
- /** The context for this reader, to communicate with the enumerator. */
- private final SourceReaderContext context;
-
- /** The availability future. This reader is available as soon as a split
is assigned. */
- private CompletableFuture<Void> availability;
-
- /**
- * The iterator producing data. Non-null after a split has been assigned.
This field is null or
- * non-null always together with the {@link #currentSplit} field.
- */
- @Nullable private IterT iterator;
-
- /**
- * The split whose data we return. Non-null after a split has been
assigned. This field is null
- * or non-null always together with the {@link #iterator} field.
- */
- @Nullable private SplitT currentSplit;
-
- /** The remaining splits that were assigned but not yet processed. */
- private final Queue<SplitT> remainingSplits;
-
- private boolean noMoreSplits;
+ extends IteratorSourceReaderBase<E, E, IterT, SplitT> {
public IteratorSourceReader(SourceReaderContext context) {
- this.context = checkNotNull(context);
- this.availability = new CompletableFuture<>();
- this.remainingSplits = new ArrayDeque<>();
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public void start() {
- // request a split if we don't have one
- if (remainingSplits.isEmpty()) {
- context.sendSplitRequest();
- }
+ super(context);
}
@Override
- public InputStatus pollNext(ReaderOutput<E> output) {
- if (iterator != null) {
- if (iterator.hasNext()) {
- output.collect(iterator.next());
- return InputStatus.MORE_AVAILABLE;
- } else {
- finishSplit();
- }
- }
-
- return tryMoveToNextSplit();
+ protected E convert(E value) {
+ return value;
}
-
- private void finishSplit() {
- iterator = null;
- currentSplit = null;
-
- // request another split if no other is left
- // we do this only here in the finishSplit part to avoid requesting a
split
- // whenever the reader is polled and doesn't currently have a split
- if (remainingSplits.isEmpty() && !noMoreSplits) {
- context.sendSplitRequest();
- }
- }
-
- private InputStatus tryMoveToNextSplit() {
- currentSplit = remainingSplits.poll();
- if (currentSplit != null) {
- iterator = currentSplit.getIterator();
- return InputStatus.MORE_AVAILABLE;
- } else if (noMoreSplits) {
- return InputStatus.END_OF_INPUT;
- } else {
- // ensure we are not called in a loop by resetting the
availability future
- if (availability.isDone()) {
- availability = new CompletableFuture<>();
- }
-
- return InputStatus.NOTHING_AVAILABLE;
- }
- }
-
- @Override
- public CompletableFuture<Void> isAvailable() {
- return availability;
- }
-
- @Override
- public void addSplits(List<SplitT> splits) {
- remainingSplits.addAll(splits);
- // set availability so that pollNext is actually called
- availability.complete(null);
- }
-
- @Override
- public void notifyNoMoreSplits() {
- noMoreSplits = true;
- // set availability so that pollNext is actually called
- availability.complete(null);
- }
-
- @Override
- public List<SplitT> snapshotState(long checkpointId) {
- if (currentSplit == null && remainingSplits.isEmpty()) {
- return Collections.emptyList();
- }
-
- final ArrayList<SplitT> allSplits = new ArrayList<>(1 +
remainingSplits.size());
- if (iterator != null && iterator.hasNext()) {
- assert currentSplit != null;
-
- @SuppressWarnings("unchecked")
- final SplitT inProgressSplit =
- (SplitT) currentSplit.getUpdatedSplitForIterator(iterator);
- allSplits.add(inProgressSplit);
- }
- allSplits.addAll(remainingSplits);
- return allSplits;
- }
-
- @Override
- public void close() throws Exception {}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java
similarity index 89%
copy from
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
copy to
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java
index d7a63c06a18..1e6237881a5 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java
@@ -50,9 +50,9 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
* the iterator that produces this reader's elements.
*/
@Public
-public class IteratorSourceReader<
- E, IterT extends Iterator<E>, SplitT extends
IteratorSourceSplit<E, IterT>>
- implements SourceReader<E, SplitT> {
+public abstract class IteratorSourceReaderBase<
+ E, O, IterT extends Iterator<E>, SplitT extends
IteratorSourceSplit<E, IterT>>
+ implements SourceReader<O, SplitT> {
/** The context for this reader, to communicate with the enumerator. */
private final SourceReaderContext context;
@@ -77,7 +77,7 @@ public class IteratorSourceReader<
private boolean noMoreSplits;
- public IteratorSourceReader(SourceReaderContext context) {
+ public IteratorSourceReaderBase(SourceReaderContext context) {
this.context = checkNotNull(context);
this.availability = new CompletableFuture<>();
this.remainingSplits = new ArrayDeque<>();
@@ -91,22 +91,30 @@ public class IteratorSourceReader<
if (remainingSplits.isEmpty()) {
context.sendSplitRequest();
}
+ start(context);
}
+ protected void start(SourceReaderContext context) {}
+
@Override
- public InputStatus pollNext(ReaderOutput<E> output) {
+ public InputStatus pollNext(ReaderOutput<O> output) {
if (iterator != null) {
if (iterator.hasNext()) {
- output.collect(iterator.next());
+ output.collect(convert(iterator.next()));
return InputStatus.MORE_AVAILABLE;
} else {
finishSplit();
}
}
-
- return tryMoveToNextSplit();
+ final InputStatus inputStatus = tryMoveToNextSplit();
+ if (inputStatus == InputStatus.MORE_AVAILABLE) {
+ output.collect(convert(iterator.next()));
+ }
+ return inputStatus;
}
+ protected abstract O convert(E value);
+
private void finishSplit() {
iterator = null;
currentSplit = null;
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java
new file mode 100644
index 00000000000..914c58d43b7
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GatedRateLimiter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of
futures in-between the
+ * external notification events. The first cycle completes immediately,
without waiting for the
+ * external notifications.
+ */
+@Internal
+public class GatedRateLimiter implements RateLimiter {
+
+ private final int capacityPerCycle;
+ private int capacityLeft;
+
+ /**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+ public GatedRateLimiter(int capacityPerCycle) {
+ checkArgument(capacityPerCycle > 0, "Capacity per cycle has to be a
positive number.");
+ this.capacityPerCycle = capacityPerCycle;
+ this.capacityLeft = capacityPerCycle;
+ }
+
+ transient CompletableFuture<Void> gatingFuture = null;
+
+ @Override
+ public CompletionStage<Void> acquire() {
+ if (gatingFuture == null) {
+ gatingFuture = CompletableFuture.completedFuture(null);
+ }
+ if (capacityLeft <= 0) {
+ gatingFuture = new CompletableFuture<>();
+ }
+ return gatingFuture.thenRun(() -> capacityLeft -= 1);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ capacityLeft = capacityPerCycle;
+ gatingFuture.complete(null);
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java
new file mode 100644
index 00000000000..7e32bc61a96
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/GuavaRateLimiter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/** An implementation of {@link RateLimiter} based on Guava's RateLimiter. */
+@Internal
+public class GuavaRateLimiter
+ implements
org.apache.flink.api.connector.source.util.ratelimit.RateLimiter {
+
+ private final Executor limiter =
+ Executors.newSingleThreadExecutor(new
ExecutorThreadFactory("flink-rate-limiter"));
+ private final RateLimiter rateLimiter;
+
+ public GuavaRateLimiter(double maxPerSecond) {
+ this.rateLimiter = RateLimiter.create(maxPerSecond);
+ }
+
+ @Override
+ public CompletionStage<Void> acquire() {
+ return CompletableFuture.runAsync(rateLimiter::acquire, limiter);
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java
new file mode 100644
index 00000000000..15938bbb81d
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.concurrent.CompletionStage;
+
+/** A convenience implementation of {@link RateLimiter} that does not throttle
requests. */
+@Internal
+public class NoOpRateLimiter implements RateLimiter {
+
+ @Override
+ public CompletionStage<Void> acquire() {
+ return FutureUtils.completedVoidFuture();
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java
new file mode 100644
index 00000000000..403ba36200c
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimitedSourceReader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Wraps the actual {@link SourceReader} and rate limits its data emission. */
+@Experimental
+public class RateLimitedSourceReader<E, SplitT extends SourceSplit>
+ implements SourceReader<E, SplitT> {
+
+ private final SourceReader<E, SplitT> sourceReader;
+ private final RateLimiter rateLimiter;
+ private CompletableFuture<Void> availabilityFuture = null;
+
+ /**
+ * Instantiates a new rate-limited source reader.
+ *
+ * @param sourceReader The actual source reader.
+ * @param rateLimiter The rate limiter.
+ */
+ public RateLimitedSourceReader(SourceReader<E, SplitT> sourceReader,
RateLimiter rateLimiter) {
+ checkNotNull(sourceReader);
+ checkNotNull(rateLimiter);
+ this.sourceReader = sourceReader;
+ this.rateLimiter = rateLimiter;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void start() {
+ sourceReader.start();
+ }
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<E> output) throws Exception {
+ // reset future because the next record may hit the rate limit
+ availabilityFuture = null;
+ final InputStatus inputStatus = sourceReader.pollNext(output);
+ if (inputStatus == InputStatus.MORE_AVAILABLE) {
+ // force another go through isAvailable() to evaluate rate-limiting
+ return InputStatus.NOTHING_AVAILABLE;
+ } else {
+ return inputStatus;
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> isAvailable() {
+ if (availabilityFuture == null) {
+ availabilityFuture =
+ rateLimiter
+ .acquire()
+ .toCompletableFuture()
+ .thenCombine(sourceReader.isAvailable(), (l, r) ->
null);
+ }
+ return availabilityFuture;
+ }
+
+ @Override
+ public void addSplits(List<SplitT> splits) {
+ sourceReader.addSplits(splits);
+ }
+
+ @Override
+ public void notifyNoMoreSplits() {
+ sourceReader.notifyNoMoreSplits();
+ }
+
+ @Override
+ public List<SplitT> snapshotState(long checkpointId) {
+ return sourceReader.snapshotState(checkpointId);
+ }
+
+ @Override
+ public void close() throws Exception {
+ sourceReader.close();
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ rateLimiter.notifyCheckpointComplete(checkpointId);
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
new file mode 100644
index 00000000000..a6bd004b894
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import org.apache.flink.annotation.Experimental;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.concurrent.CompletionStage;
+
+/** The interface to rate limit execution of methods. */
+@NotThreadSafe
+@Experimental
+public interface RateLimiter {
+
+ /**
+ * Returns a future that is completed once another event would not exceed
the rate limit. For
+ * correct functioning, the next invocation of this method should only
happen after the
+ * previously returned future has been completed.
+ */
+ CompletionStage<Void> acquire();
+
+ /**
+ * Notifies this {@code RateLimiter} that the checkpoint with the given
{@code checkpointId}
+ * completed and was committed. Makes it possible to implement rate
limiters that control data
+ * emission per checkpoint cycle.
+ *
+ * @param checkpointId The ID of the checkpoint that has been completed.
+ */
+ default void notifyCheckpointComplete(long checkpointId) {}
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java
new file mode 100644
index 00000000000..684e919a500
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A factory for {@link RateLimiter RateLimiters} which apply rate-limiting to
a source sub-task.
+ */
+@Experimental
+public interface RateLimiterStrategy extends Serializable {
+
+ /**
+ * Creates a {@link RateLimiter} that lets records through with rate
proportional to the
+ * parallelism. This method will be called once per source subtask. The
cumulative rate over all
+ * rate limiters for a source must not exceed the rate limit configured
for the strategy.
+ */
+ RateLimiter createRateLimiter(int parallelism);
+
+ /**
+ * Creates a {@code RateLimiterStrategy} that is limiting the number of
records per second.
+ *
+ * @param recordsPerSecond The number of records produced per second. The
actual number of
+ * produced records is subject to rounding due to dividing the number
of produced records
+ * among the parallel instances.
+ */
+ static RateLimiterStrategy perSecond(double recordsPerSecond) {
+ return parallelism -> new GuavaRateLimiter(recordsPerSecond /
parallelism);
+ }
+
+ /**
+ * Creates a {@code RateLimiterStrategy} that is limiting the number of
records per checkpoint.
+ *
+ * @param recordsPerCheckpoint The number of records produced per
checkpoint. This value has to
+ * be greater or equal to parallelism. The actual number of produced
records is subject to
+ * rounding due to dividing the number of produced records among the
parallel instances.
+ */
+ static RateLimiterStrategy perCheckpoint(int recordsPerCheckpoint) {
+ return parallelism -> {
+ int recordsPerSubtask = recordsPerCheckpoint / parallelism;
+ checkArgument(
+ recordsPerSubtask > 0,
+ "recordsPerCheckpoint has to be greater or equal to
parallelism. "
+ + "Either decrease the parallelism or increase the
number of "
+ + "recordsPerCheckpoint.");
+ return new GatedRateLimiter(recordsPerSubtask);
+ };
+ }
+
+ /** Creates a convenience {@code RateLimiterStrategy} that is not limiting
the records rate. */
+ static RateLimiterStrategy noOp() {
+ return parallelism -> new NoOpRateLimiter();
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
index 5944201d1ad..ce97ff71269 100644
---
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
+++
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
@@ -143,6 +143,11 @@ public class NumberSequenceSourceTest {
public UserCodeClassLoader getUserCodeClassLoader() {
return
SimpleUserCodeClassLoader.create(getClass().getClassLoader());
}
+
+ @Override
+ public int currentParallelism() {
+ return 1;
+ }
}
private static final class TestingReaderOutput<E> implements
ReaderOutput<E> {
diff --git a/flink-examples/flink-examples-streaming/pom.xml
b/flink-examples/flink-examples-streaming/pom.xml
index f037f846070..ecba009e5cb 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -72,6 +72,12 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-datagen</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
diff --git
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/datagen/DataGenerator.java
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/datagen/DataGenerator.java
new file mode 100644
index 00000000000..326e2a2a2a9
--- /dev/null
+++
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/datagen/DataGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.datagen;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/** An example for generating data with a {@link DataGeneratorSource}. */
+public class DataGenerator {
+
+ public static void main(String[] args) throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+
+ GeneratorFunction<Long, String> generatorFunction = index -> "Number:
" + index;
+
+ DataGeneratorSource<String> generatorSource =
+ new DataGeneratorSource<>(
+ generatorFunction,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(4),
+ Types.STRING);
+
+ DataStreamSource<String> streamSource =
+ env.fromSource(generatorSource,
WatermarkStrategy.noWatermarks(), "Data Generator");
+ streamSource.print();
+
+ env.execute("Data Generator Source Example");
+ }
+}
diff --git
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/datagen/DataGeneratorPerCheckpoint.java
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/datagen/DataGeneratorPerCheckpoint.java
new file mode 100644
index 00000000000..5eef73755c5
--- /dev/null
+++
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/datagen/DataGeneratorPerCheckpoint.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.datagen;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/** An example for generating specific data per checkpoint with a {@link
DataGeneratorSource} . */
+public class DataGeneratorPerCheckpoint {
+
+ public static void main(String[] args) throws Exception {
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(3000);
+ env.setParallelism(1);
+
+ final String[] elements = new String[] {"a", "b", "c", "d", "e", "f",
"g", "h", "i", "j"};
+ final int size = elements.length;
+ final GeneratorFunction<Long, String> generatorFunction =
+ index -> elements[(int) (index % size)];
+
+ final DataGeneratorSource<String> generatorSource =
+ new DataGeneratorSource<>(
+ generatorFunction,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perCheckpoint(size),
+ Types.STRING);
+
+ final DataStreamSource<String> streamSource =
+ env.fromSource(generatorSource,
WatermarkStrategy.noWatermarks(), "Data Generator");
+ streamSource.print();
+
+ env.execute("Data Generator Source Example");
+ }
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
index c90c30718e1..600570307ca 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
@@ -33,8 +33,11 @@ import javax.annotation.Nullable;
/**
* A data generator source that abstract data generator. It can be used to
easy startup/test for
* streaming job and performance testing. It is stateful, re-scalable,
possibly in parallel.
+ *
+ * @deprecated Use {@code
org.apache.flink.connector.datagen.source.DataGeneratorSource} instead.
*/
@Experimental
+@Deprecated
public class DataGeneratorSource<T> extends RichParallelSourceFunction<T>
implements CheckpointedFunction {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 71e48d9a2d0..817d19dd795 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -297,6 +297,11 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
}
};
}
+
+ @Override
+ public int currentParallelism() {
+ return
getRuntimeContext().getNumberOfParallelSubtasks();
+ }
};
sourceReader = readerFactory.apply(context);
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
index d83f8978684..4855ced967b 100644
---
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
@@ -86,6 +86,11 @@ public class TestingReaderContext implements
SourceReaderContext {
return SimpleUserCodeClassLoader.create(getClass().getClassLoader());
}
+ @Override
+ public int currentParallelism() {
+ return 1;
+ }
+
// ------------------------------------------------------------------------
public int getNumSplitRequests() {
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 5fa72304080..c310ce0998b 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -65,6 +65,12 @@ under the License.
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-datagen</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
diff --git
a/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiterTest.java
b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiterTest.java
new file mode 100644
index 00000000000..15a034e0f7d
--- /dev/null
+++
b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiterTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletionStage;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class GatedRateLimiterTest {
+
+ @Test
+ void testCapacityNotExceededOnCheckpoint() {
+ int capacityPerCycle = 5;
+
+ final GatedRateLimiter gatedRateLimiter = new
GatedRateLimiter(capacityPerCycle);
+ for (int x = 0; x < capacityPerCycle; x++) {
+ assertThat(gatedRateLimiter.acquire()).isCompleted();
+ }
+
+ CompletionStage<Void> postInitialBatch = gatedRateLimiter.acquire();
+ assertThat(postInitialBatch).isNotCompleted();
+
+ gatedRateLimiter.notifyCheckpointComplete(0);
+
+ assertThat(postInitialBatch).isCompleted();
+ for (int x = 0; x < capacityPerCycle - 1; x++) {
+ assertThat(gatedRateLimiter.acquire()).isCompleted();
+ }
+
+ CompletionStage<Void> postCheckpoint = gatedRateLimiter.acquire();
+ assertThat(postCheckpoint).isNotCompleted();
+ }
+}
diff --git
a/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java
b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java
new file mode 100644
index 00000000000..ed9df3f4a71
--- /dev/null
+++
b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
+import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** An integration test for rate limiting built into the DataGeneratorSource.
*/
+public class RateLimitedSourceReaderITCase extends TestLogger {
+
+ private static final int PARALLELISM = 4;
+
+ @RegisterExtension
+ private static final MiniClusterExtension miniClusterExtension =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .build());
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ @DisplayName("Rate limiter is used correctly.")
+ public void testRateLimitingParallelExecution() throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ final int count = 10;
+
+ final MockRateLimiterStrategy rateLimiterStrategy = new
MockRateLimiterStrategy();
+
+ final DataGeneratorSource<Long> dataGeneratorSource =
+ new DataGeneratorSource<>(index -> index, 10,
rateLimiterStrategy, Types.LONG);
+
+ final DataStream<Long> stream =
+ env.fromSource(
+ dataGeneratorSource, WatermarkStrategy.noWatermarks(),
"generator source");
+
+ final List<Long> result = stream.executeAndCollect(10000);
+ int rateLimiterCallCount =
MockRateLimiterStrategy.getRateLimitersCallCount();
+
+ assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 9));
+ assertThat(rateLimiterCallCount).isGreaterThan(count);
+ }
+
+ private List<Long> range(int startInclusive, int endInclusive) {
+ return LongStream.rangeClosed(startInclusive, endInclusive)
+ .boxed()
+ .collect(Collectors.toList());
+ }
+
+ private static final class MockRateLimiter implements RateLimiter {
+
+ int callCount;
+
+ @Override
+ public CompletableFuture<Void> acquire() {
+ callCount++;
+ return CompletableFuture.completedFuture(null);
+ }
+
+ public int getCallCount() {
+ return callCount;
+ }
+ }
+
+ private static class MockRateLimiterStrategy implements
RateLimiterStrategy {
+
+ private static final List<MockRateLimiter> rateLimiters = new
ArrayList<>();
+
+ @Override
+ public RateLimiter createRateLimiter(int parallelism) {
+ MockRateLimiter mockRateLimiter = new MockRateLimiter();
+ rateLimiters.add(mockRateLimiter);
+ return mockRateLimiter;
+ }
+
+ public static int getRateLimitersCallCount() {
+ return
rateLimiters.stream().mapToInt(MockRateLimiter::getCallCount).sum();
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index b5e4ee04a28..4c4a173ef7e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2135,6 +2135,7 @@ under the License.
<!-- MARKER:
start exclusions; these will be wiped by
tools/releasing/update_japicmp_configuration.sh -->
<!--
UnionSerializerConfigSnapshot was a PublicEvolving and Deprecated class that
has been removed, embedded inside a Public CoGroupedStreams class, triggering
this false failure -->
<exclude>org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializerConfigSnapshot</exclude>
+
<exclude>org.apache.flink.api.connector.source.SourceReaderContext#currentParallelism()</exclude>
<!-- MARKER:
end exclusions -->
</excludes>
<accessModifier>public</accessModifier>