Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6130#discussion_r193638907
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
    @@ -1355,26 +1541,39 @@ public TimeCharacteristic 
getStreamTimeCharacteristic() {
                                                                                
                                TypeInformation<OUT> typeInfo,
                                                                                
                                String sourceName,
                                                                                
                                FileProcessingMode monitoringMode,
    -                                                                           
                                long interval) {
    +                                                                           
                                long interval,
    +                                                                           
                                int numTimes) {
     
                Preconditions.checkNotNull(inputFormat, "Unspecified file input 
format.");
                Preconditions.checkNotNull(typeInfo, "Unspecified output type 
information.");
                Preconditions.checkNotNull(sourceName, "Unspecified name for 
the source.");
                Preconditions.checkNotNull(monitoringMode, "Unspecified 
monitoring mode.");
     
    -           
Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE)
 ||
    -                           interval >= 
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
    -                   "The path monitoring interval cannot be less than " +
    -                                   
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");
    +           switch (monitoringMode) {
    +                   case PROCESS_ONCE:
    +                           Preconditions.checkArgument(numTimes == 1,
    +                                   "The specified number of times to read 
a file should be 1, but is " + numTimes);
    +                           break;
    +                   case PROCESS_N_TIMES:
    +                           Preconditions.checkArgument(numTimes >= 1,
    +                                   "The specified number of times to read 
a file should be no less than 1, but is " + numTimes);
    +                           break;
    +                   case PROCESS_CONTINUOUSLY:
    +                           Preconditions.checkArgument(interval >= 
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
    +                                   String.format("The path monitoring 
interval cannot be less than %d ms in %s mode.",
    +                                           
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL, 
FileProcessingMode.PROCESS_CONTINUOUSLY));
    +                           break;
    +                   default:
    --- End diff --
    
    How about raising an `IllegalArgumentException` here?


---

Reply via email to