Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/1857#discussion_r58701096
--- Diff:
flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---
@@ -777,7 +777,50 @@ public CsvReader readCsvFile(String filePath) {
throw new IllegalArgumentException("The number of
elements must not be zero.");
}
- return fromCollection(Arrays.asList(data),
TypeExtractor.getForObject(data[0]), Utils.getCallLocationName());
+ TypeInformation<X> typeInfo;
+ try {
+ typeInfo = TypeExtractor.getForObject(data[0]);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Could not create
TypeInformation for type " + data[0].getClass().getName()
+ + "; please specify the TypeInformation
manually via "
+ +
"StreamExecutionEnvironment#fromCollection(Collection, TypeInformation)");
+ }
+
+ return fromCollection(Arrays.asList(data), typeInfo,
Utils.getCallLocationName());
+ }
+
+ /**
+ * Creates a new data set that contains the given elements. The
framework will determine the type according to the
+ * based type user supplied. The elements should be the same or be the
subclass to the based type.
+ * The sequence of elements must not be empty.
+ * Note that this operation will result in a non-parallel data source,
i.e. a data source with
+ * a parallelism of one.
+ *
+ * @param type The base class type for every element in the collection.
+ * @param data The elements to make up the data set.
+ * @return A DataSet representing the given list of elements.
+ */
+ @SafeVarargs
+ public final <X> DataSource<X> fromElements(Class<X> type, X... data) {
+ if (data == null) {
+ throw new IllegalArgumentException("The data must not
be null.");
+ }
+ if (data.length == 0) {
+ throw new IllegalArgumentException("The number of
elements must not be zero.");
+ }
+
+ TypeInformation<X> typeInfo;
+ try {
+ typeInfo = TypeExtractor.getForClass(type);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Could not create
TypeInformation for type " + type.getName()
+ + "; please specify the TypeInformation
manually via "
+ +
"StreamExecutionEnvironment#fromCollection(Collection, TypeInformation)");
--- End diff --
This should be ExecutionEnvironment#fromCollection
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---