[
https://issues.apache.org/jira/browse/FLINK-3444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15228218#comment-15228218
]
ASF GitHub Bot commented on FLINK-3444:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/1857#discussion_r58701096
--- Diff:
flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---
@@ -777,7 +777,50 @@ public CsvReader readCsvFile(String filePath) {
throw new IllegalArgumentException("The number of
elements must not be zero.");
}
- return fromCollection(Arrays.asList(data),
TypeExtractor.getForObject(data[0]), Utils.getCallLocationName());
+ TypeInformation<X> typeInfo;
+ try {
+ typeInfo = TypeExtractor.getForObject(data[0]);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Could not create
TypeInformation for type " + data[0].getClass().getName()
+ + "; please specify the TypeInformation
manually via "
+ +
"StreamExecutionEnvironment#fromCollection(Collection, TypeInformation)");
+ }
+
+ return fromCollection(Arrays.asList(data), typeInfo,
Utils.getCallLocationName());
+ }
+
+ /**
+ * Creates a new data set that contains the given elements. The
framework will determine the type according to the
+ * based type user supplied. The elements should be the same or be the
subclass to the based type.
+ * The sequence of elements must not be empty.
+ * Note that this operation will result in a non-parallel data source,
i.e. a data source with
+ * a parallelism of one.
+ *
+ * @param type The base class type for every element in the collection.
+ * @param data The elements to make up the data set.
+ * @return A DataSet representing the given list of elements.
+ */
+ @SafeVarargs
+ public final <X> DataSource<X> fromElements(Class<X> type, X... data) {
+ if (data == null) {
+ throw new IllegalArgumentException("The data must not
be null.");
+ }
+ if (data.length == 0) {
+ throw new IllegalArgumentException("The number of
elements must not be zero.");
+ }
+
+ TypeInformation<X> typeInfo;
+ try {
+ typeInfo = TypeExtractor.getForClass(type);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Could not create
TypeInformation for type " + type.getName()
+ + "; please specify the TypeInformation
manually via "
+ +
"StreamExecutionEnvironment#fromCollection(Collection, TypeInformation)");
--- End diff --
This should be ExecutionEnvironment#fromCollection
> env.fromElements relies on the first input element for determining the
> DataSet/DataStream type
> ----------------------------------------------------------------------------------------------
>
> Key: FLINK-3444
> URL: https://issues.apache.org/jira/browse/FLINK-3444
> Project: Flink
> Issue Type: Bug
> Components: DataSet API, DataStream API
> Affects Versions: 0.10.0, 1.0.0
> Reporter: Vasia Kalavri
>
> The {{fromElements}} method of the {{ExecutionEnvironment}} and
> {{StreamExecutionEnvironment}} determines the DataSet/DataStream type by
> extracting the type of the first input element.
> This is problematic if the first element is a subtype of another element in
> the collection.
> For example, the following
> {code}
> DataStream<Event> input = env.fromElements(new Event(1, "a"), new SubEvent(2,
> "b"));
> {code}
> succeeds, while the following
> {code}
> DataStream<Event> input = env.fromElements(new SubEvent(1, "a"), new Event(2,
> "b"));
> {code}
> fails with "java.lang.IllegalArgumentException: The elements in the
> collection are not all subclasses of SubEvent".
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)