Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6130#discussion_r193637758
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
---
@@ -1093,6 +1178,59 @@ public TimeCharacteristic
getStreamTimeCharacteristic() {
return readFile(inputFormat, filePath, watchType, interval,
typeInformation);
}
+ /**
+ * Reads the contents of the user-specified {@code filePath} based on
the given {@link FileInputFormat}. Depending
+ * on the provided {@link FileProcessingMode}, the source may
periodically monitor (every {@code interval} ms) the path
+ * for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}),
process once or {@code numTimes} times the data currently in
+ * the path and exit ({@link FileProcessingMode#PROCESS_ONCE} or {@link
FileProcessingMode#PROCESS_N_TIMES}).
+ * In addition, if the path contains files not to be processed, the
user can specify a custom {@link FilePathFilter}.
+ * As a default implementation you can use {@link
FilePathFilter#createDefaultFilter()}.
+ *
+ * <p>Since all data streams need specific information about their
types, this method needs to determine the
+ * type of the data produced by the input format. It will attempt to
determine the data type by reflection,
+ * unless the input format implements the {@link
org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+ * In the latter case, this method will invoke the
+ * {@link
org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()}
method to determine data
+ * type produced by the input format.
+ *
+ * <p><b>NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set
to {@link FileProcessingMode#PROCESS_ONCE} or {@link
FileProcessingMode#PROCESS_N_TIMES} ,
+ * the source monitors the path <b>once</b>, creates the {@link
org.apache.flink.core.fs.FileInputSplit FileInputSplits}
+ * to be processed, forwards them to the downstream {@link
ContinuousFileReaderOperator readers} to read the actual data,
+ * and exits, without waiting for the readers to finish reading. This
implies that no more checkpoint barriers
+ * are going to be forwarded after the source exits, thus having no
checkpoints after that point.
+ *
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @param filePath
+ * The path of the file, as a URI (e.g.,
"file:///some/local/file" or "hdfs://host:port/file/path")
+ * @param watchType
+ * The mode in which the source should operate, i.e.
monitor path and react to new data, or process once and exit
+ * @param interval
+ * In the case of periodic path monitoring, this specifies
the interval (in millis) between consecutive path scans
+ * @param <OUT>
+ * The type of the returned data stream
+ * @param numTimes
+ * The number of times to read the file
+ * @return The data stream that represents the data read from the given
file
+ */
+ @PublicEvolving
+ public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT>
inputFormat,
+
String filePath,
+
FileProcessingMode watchType,
+
long interval,
+
int numTimes) {
+
+ TypeInformation<OUT> typeInformation;
+ try {
+ typeInformation =
TypeExtractor.getInputFormatTypes(inputFormat);
+ } catch (Exception e) {
+ throw new InvalidProgramException("The type returned by
the input format could not be " +
+ "automatically determined. Please specify the
TypeInformation of the produced type " +
+ "explicitly by using the
'createInput(InputFormat, TypeInformation)' method instead.");
--- End diff --
Maybe it better to not swallow the Exception here.
---