This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e44efbff8070dca3489550fdeadc5e1ce31e68c1 Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com> AuthorDate: Fri Oct 20 00:58:03 2023 +0200 [FLINK-28229][connectors] Introduce FLIP-27 alternative to StreamExecutionEnvironment#fromCollection() --- .../datagen/source/DataGeneratorSource.java | 3 +- .../environment/StreamExecutionEnvironment.java | 85 ++++++++++++++++------ pom.xml | 2 + 3 files changed, 67 insertions(+), 23 deletions(-) diff --git a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java index a344eb635ad..3d2416c1e16 100644 --- a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java +++ b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java @@ -150,7 +150,8 @@ public class DataGeneratorSource<OUT> this.sourceReaderFactory = checkNotNull(sourceReaderFactory); this.generatorFunction = checkNotNull(generatorFunction); this.typeInfo = checkNotNull(typeInfo); - this.numberSource = new NumberSequenceSource(0, count - 1); + long to = count > 0 ? count - 1 : 0; // a noop source (0 elements) is used in Table tests + this.numberSource = new NumberSequenceSource(0, to); ClosureCleaner.clean( generatorFunction, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); ClosureCleaner.clean( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 9069b3a0d3c..18dc49d3895 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1222,7 +1222,23 @@ public class StreamExecutionEnvironment implements AutoCloseable { return fromData(Arrays.asList(data), typeInfo); } - private <OUT> DataStreamSource<OUT> fromData( + /** + * Creates a new data stream that contains the given elements. The elements must all be of the + * same type, for example, all of the {@link String} or {@link Integer}. + * + * <p>The framework will try and determine the exact type from the elements. In case of generic + * elements, it may be necessary to manually supply the type information via {@link + * #fromData(org.apache.flink.api.common.typeinfo.TypeInformation, OUT...)}. + * + * <p>NOTE: This creates a non-parallel data stream source by default (parallelism of one). + * Adjustment of parallelism is supported via {@code setParallelism()} on the result. + * + * @param data The collection of elements to create the data stream from. + * @param typeInfo The type information of the elements. + * @param <OUT> The generic type of the returned data stream. + * @return The data stream representing the given collection + */ + public <OUT> DataStreamSource<OUT> fromData( Collection<OUT> data, TypeInformation<OUT> typeInfo) { Preconditions.checkNotNull(data, "Collection must not be null"); @@ -1273,6 +1289,51 @@ public class StreamExecutionEnvironment implements AutoCloseable { return fromData(Arrays.asList(data), typeInfo); } + /** + * Creates a new data stream that contains the given elements.The type of the data stream is + * that of the elements in the collection. + * + * <p>The framework will try and determine the exact type from the collection elements. In case + * of generic elements, it may be necessary to manually supply the type information via {@link + * #fromData(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}. + * + * <p>NOTE: This creates a non-parallel data stream source by default (parallelism of one). + * Adjustment of parallelism is supported via {@code setParallelism()} on the result. + * + * @param data The collection of elements to create the data stream from. + * @param <OUT> The generic type of the returned data stream. + * @return The data stream representing the given collection + */ + public <OUT> DataStreamSource<OUT> fromData(Collection<OUT> data) { + TypeInformation<OUT> typeInfo = extractTypeInfoFromCollection(data); + return fromData(data, typeInfo); + } + + private static <OUT> TypeInformation<OUT> extractTypeInfoFromCollection(Collection<OUT> data) { + Preconditions.checkNotNull(data, "Collection must not be null"); + if (data.isEmpty()) { + throw new IllegalArgumentException("Collection must not be empty"); + } + + OUT first = data.iterator().next(); + if (first == null) { + throw new IllegalArgumentException("Collection must not contain null elements"); + } + + TypeInformation<OUT> typeInfo; + try { + typeInfo = TypeExtractor.getForObject(first); + } catch (Exception e) { + throw new RuntimeException( + "Could not create TypeInformation for type " + + first.getClass() + + "; please specify the TypeInformation manually via the version of the " + + "method that explicitly accepts it as an argument.", + e); + } + return typeInfo; + } + /** * Creates a new data stream that contains a sequence of numbers. This is a parallel source, if * you manually set the parallelism to {@code 1} (using {@link @@ -1415,27 +1476,7 @@ public class StreamExecutionEnvironment implements AutoCloseable { * @return The data stream representing the given collection */ public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) { - Preconditions.checkNotNull(data, "Collection must not be null"); - if (data.isEmpty()) { - throw new IllegalArgumentException("Collection must not be empty"); - } - - OUT first = data.iterator().next(); - if (first == null) { - throw new IllegalArgumentException("Collection must not contain null elements"); - } - - TypeInformation<OUT> typeInfo; - try { - typeInfo = TypeExtractor.getForObject(first); - } catch (Exception e) { - throw new RuntimeException( - "Could not create TypeInformation for type " - + first.getClass() - + "; please specify the TypeInformation manually via " - + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", - e); - } + TypeInformation<OUT> typeInfo = extractTypeInfoFromCollection(data); return fromCollection(data, typeInfo); } diff --git a/pom.xml b/pom.xml index c964ea7c9a6..18c8b4b3f49 100644 --- a/pom.xml +++ b/pom.xml @@ -2299,6 +2299,8 @@ under the License. <exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.lang.Object[])</exclude> <exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(org.apache.flink.api.common.typeinfo.TypeInformation,java.lang.Object[])</exclude> <exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.lang.Class,java.lang.Object[])</exclude> + <exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection)</exclude> + <exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude> <!-- MARKER: end exclusions --> </excludes> <accessModifier>public</accessModifier>