[
https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504307#comment-16504307
]
ASF GitHub Bot commented on FLINK-9545:
---------------------------------------
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6130#discussion_r193638907
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
---
@@ -1355,26 +1541,39 @@ public TimeCharacteristic
getStreamTimeCharacteristic() {
TypeInformation<OUT> typeInfo,
String sourceName,
FileProcessingMode monitoringMode,
-
long interval) {
+
long interval,
+
int numTimes) {
Preconditions.checkNotNull(inputFormat, "Unspecified file input
format.");
Preconditions.checkNotNull(typeInfo, "Unspecified output type
information.");
Preconditions.checkNotNull(sourceName, "Unspecified name for
the source.");
Preconditions.checkNotNull(monitoringMode, "Unspecified
monitoring mode.");
-
Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE)
||
- interval >=
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
- "The path monitoring interval cannot be less than " +
-
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");
+ switch (monitoringMode) {
+ case PROCESS_ONCE:
+ Preconditions.checkArgument(numTimes == 1,
+ "The specified number of times to read
a file should be 1, but is " + numTimes);
+ break;
+ case PROCESS_N_TIMES:
+ Preconditions.checkArgument(numTimes >= 1,
+ "The specified number of times to read
a file should be no less than 1, but is " + numTimes);
+ break;
+ case PROCESS_CONTINUOUSLY:
+ Preconditions.checkArgument(interval >=
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
+ String.format("The path monitoring
interval cannot be less than %d ms in %s mode.",
+
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
FileProcessingMode.PROCESS_CONTINUOUSLY));
+ break;
+ default:
--- End diff --
How about raising an `IllegalArgumentException` here?
> Support read a file multiple times in Flink DataStream
> -------------------------------------------------------
>
> Key: FLINK-9545
> URL: https://issues.apache.org/jira/browse/FLINK-9545
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API
> Affects Versions: 1.6.0
> Reporter: Bowen Li
> Assignee: Bowen Li
> Priority: Major
> Fix For: 1.6.0
>
>
> we need {{StreamExecutionEnvironment.readFile/readTextFile}} to read each
> file for N times, but currently it only supports reading file once.
> add support for the feature.
> Plan:
> add a new processing mode as PROCESSING_N_TIMES, and add additional parameter
> {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)