Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/226#discussion_r21037226
  
    --- Diff: 
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
 ---
    @@ -17,33 +17,125 @@
     
     package org.apache.flink.streaming.api.function.source;
     
    +import org.apache.flink.core.fs.FSDataInputStream;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.util.Collector;
    +
     import java.io.BufferedReader;
    -import java.io.FileReader;
     import java.io.IOException;
    -
    -import org.apache.flink.util.Collector;
    +import java.io.InputStreamReader;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
     
     public class FileStreamFunction implements SourceFunction<String> {
        private static final long serialVersionUID = 1L;
     
    -   private final String path;
    +   public enum WatchType {
    +           ONLY_NEW_FILES, // Only new files will be processed.
    +           REPROCESS_WITH_APPENDED, // When some files are appended, all 
contents of the files will be processed.
    +           PROCESS_ONLY_APPENDED // When some files are appended, only 
appended contents will be processed.
    +   }
     
    -   public FileStreamFunction(String path) {
    +   private String path;
    +   private long interval;
    +   private WatchType watchType;
    +
    +   private FileSystem fileSystem;
    +   private long lastModificationTime;
    +   private Map<String, Long> offsetOfFiles;
    +
    +   public FileStreamFunction(String path, long interval, WatchType 
watchType) {
                this.path = path;
    +           this.interval = interval;
    +           this.watchType = watchType;
    +
    +           this.lastModificationTime = System.currentTimeMillis();
    +           this.offsetOfFiles = new HashMap<String, Long>();
        }
     
        @Override
    -   public void invoke(Collector<String> collector) throws IOException {
    +   public void invoke(Collector<String> collector) throws Exception {
    +           fileSystem = FileSystem.get(new URI(path));
    +
                while (true) {
    -                   BufferedReader br = new BufferedReader(new 
FileReader(path));
    -                   String line = br.readLine();
    -                   while (line != null) {
    -                           if (!line.equals("")) {
    -                                   collector.collect(line);
    +                   List<String> files = listNewFiles();
    +                   for (String filePath : files) {
    +                           if (watchType == WatchType.ONLY_NEW_FILES || 
watchType == WatchType.REPROCESS_WITH_APPENDED) {
    +                                   processEntire(collector, filePath);
    +                           } else {
    +                                   processOnlyAppended(collector, 
filePath);
                                }
    -                           line = br.readLine();
                        }
    -                   br.close();
    +
    +                   Thread.sleep(interval);
                }
        }
    +
    +   private void processEntire(Collector<String> collector, String path) 
throws IOException {
    +           BufferedReader reader = new BufferedReader(new 
InputStreamReader(fileSystem.open(new Path(path))));
    +           String line;
    +
    +           try {
    +                   while ((line = reader.readLine()) != null) {
    +                           collector.collect(line);
    +                   }
    +
    +                   offsetOfFiles.put(path, 0L);
    +           } finally {
    +                   reader.close();
    +           }
    +   }
    +
    +   private void processOnlyAppended(Collector<String> collector, String 
path) throws IOException {
    +           FSDataInputStream stream = fileSystem.open(new Path(path));
    +           if (offsetOfFiles.containsKey(path)) {
    +                   stream.seek(offsetOfFiles.get(path));
    +           }
    +
    +           BufferedReader reader = new BufferedReader(new 
InputStreamReader(stream));
    +           String line;
    +
    +           try {
    +                   while ((line = reader.readLine()) != null) {
    +                           collector.collect(line);
    +                   }
    +
    +                   offsetOfFiles.put(path, stream.getPos());
    +                   System.out.println(path + ": " + 
String.valueOf(stream.getPos()));
    --- End diff --
    
    This seems to be debugging code ;) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to