Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6130#discussion_r193638907
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
---
@@ -1355,26 +1541,39 @@ public TimeCharacteristic
getStreamTimeCharacteristic() {
TypeInformation<OUT> typeInfo,
String sourceName,
FileProcessingMode monitoringMode,
-
long interval) {
+
long interval,
+
int numTimes) {
Preconditions.checkNotNull(inputFormat, "Unspecified file input
format.");
Preconditions.checkNotNull(typeInfo, "Unspecified output type
information.");
Preconditions.checkNotNull(sourceName, "Unspecified name for
the source.");
Preconditions.checkNotNull(monitoringMode, "Unspecified
monitoring mode.");
-
Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE)
||
- interval >=
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
- "The path monitoring interval cannot be less than " +
-
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");
+ switch (monitoringMode) {
+ case PROCESS_ONCE:
+ Preconditions.checkArgument(numTimes == 1,
+ "The specified number of times to read
a file should be 1, but is " + numTimes);
+ break;
+ case PROCESS_N_TIMES:
+ Preconditions.checkArgument(numTimes >= 1,
+ "The specified number of times to read
a file should be no less than 1, but is " + numTimes);
+ break;
+ case PROCESS_CONTINUOUSLY:
+ Preconditions.checkArgument(interval >=
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
+ String.format("The path monitoring
interval cannot be less than %d ms in %s mode.",
+
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
FileProcessingMode.PROCESS_CONTINUOUSLY));
+ break;
+ default:
--- End diff --
How about raising an `IllegalArgumentException` here?
---