[
https://issues.apache.org/jira/browse/FLINK-1081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14228343#comment-14228343
]
ASF GitHub Bot commented on FLINK-1081:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/226#discussion_r21037263
--- Diff:
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
---
@@ -17,33 +17,125 @@
package org.apache.flink.streaming.api.function.source;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
+
import java.io.BufferedReader;
-import java.io.FileReader;
import java.io.IOException;
-
-import org.apache.flink.util.Collector;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
public class FileStreamFunction implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
- private final String path;
+ public enum WatchType {
+ ONLY_NEW_FILES, // Only new files will be processed.
+ REPROCESS_WITH_APPENDED, // When some files are appended, all
contents of the files will be processed.
+ PROCESS_ONLY_APPENDED // When some files are appended, only
appended contents will be processed.
+ }
- public FileStreamFunction(String path) {
+ private String path;
+ private long interval;
+ private WatchType watchType;
+
+ private FileSystem fileSystem;
+ private long lastModificationTime;
+ private Map<String, Long> offsetOfFiles;
+
+ public FileStreamFunction(String path, long interval, WatchType
watchType) {
this.path = path;
+ this.interval = interval;
+ this.watchType = watchType;
+
+ this.lastModificationTime = System.currentTimeMillis();
+ this.offsetOfFiles = new HashMap<String, Long>();
}
@Override
- public void invoke(Collector<String> collector) throws IOException {
+ public void invoke(Collector<String> collector) throws Exception {
+ fileSystem = FileSystem.get(new URI(path));
+
while (true) {
- BufferedReader br = new BufferedReader(new
FileReader(path));
- String line = br.readLine();
- while (line != null) {
- if (!line.equals("")) {
- collector.collect(line);
+ List<String> files = listNewFiles();
+ for (String filePath : files) {
+ if (watchType == WatchType.ONLY_NEW_FILES ||
watchType == WatchType.REPROCESS_WITH_APPENDED) {
+ processEntire(collector, filePath);
+ } else {
+ processOnlyAppended(collector,
filePath);
}
- line = br.readLine();
}
- br.close();
+
+ Thread.sleep(interval);
}
}
+
+ private void processEntire(Collector<String> collector, String path)
throws IOException {
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(fileSystem.open(new Path(path))));
+ String line;
+
+ try {
+ while ((line = reader.readLine()) != null) {
+ collector.collect(line);
+ }
+
+ offsetOfFiles.put(path, 0L);
+ } finally {
+ reader.close();
+ }
+ }
+
+ private void processOnlyAppended(Collector<String> collector, String
path) throws IOException {
+ FSDataInputStream stream = fileSystem.open(new Path(path));
+ if (offsetOfFiles.containsKey(path)) {
+ stream.seek(offsetOfFiles.get(path));
+ }
+
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(stream));
+ String line;
+
+ try {
+ while ((line = reader.readLine()) != null) {
+ collector.collect(line);
+ }
+
+ offsetOfFiles.put(path, stream.getPos());
+ System.out.println(path + ": " +
String.valueOf(stream.getPos()));
--- End diff --
But maybe its a good idea, to have some LOG.debug / LOG.info messages in
the code
> Add HDFS file-stream source for streaming
> -----------------------------------------
>
> Key: FLINK-1081
> URL: https://issues.apache.org/jira/browse/FLINK-1081
> Project: Flink
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 0.7.0-incubating
> Reporter: Gyula Fora
> Assignee: Chiwan Park
> Labels: starter
>
> Add data stream source that will monitor a slected directory on HDFS (or
> other filesystems as well) and will process all new files created.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)