[ 
https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504307#comment-16504307
 ] 

ASF GitHub Bot commented on FLINK-9545:
---------------------------------------

Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6130#discussion_r193638907
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
    @@ -1355,26 +1541,39 @@ public TimeCharacteristic 
getStreamTimeCharacteristic() {
                                                                                
                                TypeInformation<OUT> typeInfo,
                                                                                
                                String sourceName,
                                                                                
                                FileProcessingMode monitoringMode,
    -                                                                           
                                long interval) {
    +                                                                           
                                long interval,
    +                                                                           
                                int numTimes) {
     
                Preconditions.checkNotNull(inputFormat, "Unspecified file input 
format.");
                Preconditions.checkNotNull(typeInfo, "Unspecified output type 
information.");
                Preconditions.checkNotNull(sourceName, "Unspecified name for 
the source.");
                Preconditions.checkNotNull(monitoringMode, "Unspecified 
monitoring mode.");
     
    -           
Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE)
 ||
    -                           interval >= 
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
    -                   "The path monitoring interval cannot be less than " +
    -                                   
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");
    +           switch (monitoringMode) {
    +                   case PROCESS_ONCE:
    +                           Preconditions.checkArgument(numTimes == 1,
    +                                   "The specified number of times to read 
a file should be 1, but is " + numTimes);
    +                           break;
    +                   case PROCESS_N_TIMES:
    +                           Preconditions.checkArgument(numTimes >= 1,
    +                                   "The specified number of times to read 
a file should be no less than 1, but is " + numTimes);
    +                           break;
    +                   case PROCESS_CONTINUOUSLY:
    +                           Preconditions.checkArgument(interval >= 
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
    +                                   String.format("The path monitoring 
interval cannot be less than %d ms in %s mode.",
    +                                           
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL, 
FileProcessingMode.PROCESS_CONTINUOUSLY));
    +                           break;
    +                   default:
    --- End diff --
    
    How about raising an `IllegalArgumentException` here?


> Support read a file multiple times in Flink DataStream 
> -------------------------------------------------------
>
>                 Key: FLINK-9545
>                 URL: https://issues.apache.org/jira/browse/FLINK-9545
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>    Affects Versions: 1.6.0
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>            Priority: Major
>             Fix For: 1.6.0
>
>
> we need {{StreamExecutionEnvironment.readFile/readTextFile}} to read each 
> file for N times, but currently it only supports reading file once.
> add support for the feature.
> Plan:
> add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
> {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to