[ 
https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504293#comment-16504293
 ] 

ASF GitHub Bot commented on FLINK-9545:
---------------------------------------

Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6130#discussion_r193637243
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
    @@ -970,6 +991,37 @@ public TimeCharacteristic 
getStreamTimeCharacteristic() {
                return readFile(format, filePath, 
FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
        }
     
    +   /**
    +    * Reads the given file line-by-line and creates a data stream that 
contains a string with the
    +    * contents of each such line. The {@link java.nio.charset.Charset} 
with the given name will be
    +    * used to read the files.
    +    *
    +    * <p><b>NOTES ON CHECKPOINTING: </b> The source monitors the path, 
creates the
    +    * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to 
be processed,
    +    * forwards them to the downstream {@link ContinuousFileReaderOperator 
readers} to read the actual data,
    +    * and exits, without waiting for the readers to finish reading. This 
implies that no more checkpoint
    +    * barriers are going to be forwarded after the source exits, thus 
having no checkpoints after that point.
    +    *
    +    * @param filePath
    +    *              The path of the file, as a URI (e.g., 
"file:///some/local/file" or "hdfs://host:port/file/path")
    +    * @param charsetName
    +    *              The name of the character set used to read the file
    +    * @param numTimes
    +    *              The number of times to read the file
    +    * @return The data stream that represents the data read from the given 
file as text lines
    +    */
    +   public DataStreamSource<String> readTextFile(String filePath, String 
charsetName, int numTimes) {
    +           Preconditions.checkNotNull(filePath, "The file path must not be 
null.");
    +           Preconditions.checkNotNull(filePath.isEmpty(), "The file path 
must not be empty.");
    --- End diff --
    
    Should this be `Preconditions.checkArgument(filePath.isEmpty(), "The file 
path must not be empty.");`?
    BTW, maybe we could use the `Strings.isNullOrEmpty(filePath)` to merge this 
two checks into one.


> Support read a file multiple times in Flink DataStream 
> -------------------------------------------------------
>
>                 Key: FLINK-9545
>                 URL: https://issues.apache.org/jira/browse/FLINK-9545
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>    Affects Versions: 1.6.0
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>            Priority: Major
>             Fix For: 1.6.0
>
>
> we need {{StreamExecutionEnvironment.readFile/readTextFile}} to read each 
> file for N times, but currently it only supports reading file once.
> add support for the feature.
> Plan:
> add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
> {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to