Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6130#discussion_r193637932
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
---
@@ -1161,7 +1301,53 @@ public TimeCharacteristic
getStreamTimeCharacteristic() {
Preconditions.checkNotNull(filePath.isEmpty(), "The file path
must not be empty.");
inputFormat.setFilePath(filePath);
- return createFileInput(inputFormat, typeInformation, "Custom
File Source", watchType, interval);
+ return createFileInput(inputFormat, typeInformation, "Custom
File Source", watchType, interval, 1);
+ }
+
+ /**
+ * Reads the contents of the user-specified {@code filePath} based on
the given {@link FileInputFormat}.
+ * Depending on the provided {@link FileProcessingMode}, the source may
periodically monitor (every {@code interval} ms)
+ * the path for new data ({@link
FileProcessingMode#PROCESS_CONTINUOUSLY}), process once or {@code numTimes}
times the data currently in
+ * the path and exit ({@link FileProcessingMode#PROCESS_ONCE} or {@link
FileProcessingMode#PROCESS_N_TIMES}).
+ * In addition, if the path contains files not to be processed, the
user can specify a custom {@link FilePathFilter}.
+ * As a default implementation you can use {@link
FilePathFilter#createDefaultFilter()}.
+ *
+ * <p><b>NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set
to {@link FileProcessingMode#PROCESS_ONCE} or {@link
FileProcessingMode#PROCESS_N_TIMES},
+ * the source monitors the path <b>once</b>, creates the {@link
org.apache.flink.core.fs.FileInputSplit FileInputSplits}
+ * to be processed, forwards them to the downstream {@link
ContinuousFileReaderOperator readers} to read the actual data,
+ * and exits, without waiting for the readers to finish reading. This
implies that no more checkpoint barriers
+ * are going to be forwarded after the source exits, thus having no
checkpoints after that point.
+ *
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @param filePath
+ * The path of the file, as a URI (e.g.,
"file:///some/local/file" or "hdfs://host:port/file/path")
+ * @param watchType
+ * The mode in which the source should operate, i.e.
monitor path and react to new data, or process once and exit
+ * @param typeInformation
+ * Information on the type of the elements in the output
stream
+ * @param interval
+ * In the case of periodic path monitoring, this specifies
the interval (in millis) between consecutive path scans
+ * @param <OUT>
+ * The type of the returned data stream
+ * @param numTimes
+ * The number of times to read the file
+ * @return The data stream that represents the data read from the given
file
+ */
+ @PublicEvolving
+ public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT>
inputFormat,
+
String filePath,
+
FileProcessingMode watchType,
+
long interval,
+
TypeInformation<OUT> typeInformation,
+
int numTimes) {
+
+ Preconditions.checkNotNull(inputFormat, "InputFormat must not
be null.");
+ Preconditions.checkNotNull(filePath, "The file path must not be
null.");
+ Preconditions.checkNotNull(filePath.isEmpty(), "The file path
must not be empty.");
--- End diff --
Again, looks like should be `Preconditions.checkArgument()`.
---