Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6130#discussion_r193637243
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
---
@@ -970,6 +991,37 @@ public TimeCharacteristic
getStreamTimeCharacteristic() {
return readFile(format, filePath,
FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
}
+ /**
+ * Reads the given file line-by-line and creates a data stream that
contains a string with the
+ * contents of each such line. The {@link java.nio.charset.Charset}
with the given name will be
+ * used to read the files.
+ *
+ * <p><b>NOTES ON CHECKPOINTING: </b> The source monitors the path,
creates the
+ * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to
be processed,
+ * forwards them to the downstream {@link ContinuousFileReaderOperator
readers} to read the actual data,
+ * and exits, without waiting for the readers to finish reading. This
implies that no more checkpoint
+ * barriers are going to be forwarded after the source exits, thus
having no checkpoints after that point.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g.,
"file:///some/local/file" or "hdfs://host:port/file/path")
+ * @param charsetName
+ * The name of the character set used to read the file
+ * @param numTimes
+ * The number of times to read the file
+ * @return The data stream that represents the data read from the given
file as text lines
+ */
+ public DataStreamSource<String> readTextFile(String filePath, String
charsetName, int numTimes) {
+ Preconditions.checkNotNull(filePath, "The file path must not be
null.");
+ Preconditions.checkNotNull(filePath.isEmpty(), "The file path
must not be empty.");
--- End diff --
Should this be `Preconditions.checkArgument(filePath.isEmpty(), "The file
path must not be empty.");`?
BTW, maybe we could use the `Strings.isNullOrEmpty(filePath)` to merge this
two checks into one.
---