http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 2f0a16a..c8e9846 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -22,11 +22,9 @@ import 
org.apache.flink.api.common.io.CheckpointableInputFormat;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -43,44 +41,42 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
+import java.util.PriorityQueue;
 import java.util.Queue;
 
+import static 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit.EOS;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * This is the operator that reads the {@link FileInputSplit FileInputSplits} 
received from
- * the preceding {@link ContinuousFileMonitoringFunction}. This operator can 
have parallelism
- * greater than 1, contrary to the {@link ContinuousFileMonitoringFunction} 
which has
- * a parallelism of 1.
+ * The operator that reads the {@link TimestampedFileInputSplit splits} 
received from the preceding
+ * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link 
ContinuousFileMonitoringFunction}
+ * which has a parallelism of 1, this operator can have DOP > 1.
  * <p/>
- * This operator will receive the split descriptors, put them in a queue, and 
have another
- * thread read the actual data from the split. This architecture allows the 
separation of the
- * reading thread, from the one emitting the checkpoint barriers, thus 
removing any potential
+ * As soon as a split descriptor is received, it is put in a queue, and have 
another
+ * thread read the actual data of the split. This architecture allows the 
separation of the
+ * reading thread from the one emitting the checkpoint barriers, thus removing 
any potential
  * back-pressure.
  */
 @Internal
-public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends 
AbstractStreamOperator<OUT>
-       implements OneInputStreamOperator<FileInputSplit, OUT>, 
OutputTypeConfigurable<OUT>, StreamCheckpointedOperator {
+public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OUT>
+       implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, 
OutputTypeConfigurable<OUT>, StreamCheckpointedOperator {
 
        private static final long serialVersionUID = 1L;
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
 
-       /** A value that serves as a kill-pill to stop the reading thread when 
no more splits remain. */
-       private static final FileInputSplit EOS = new FileInputSplit(-1, null, 
-1, -1, null);
-
        private FileInputFormat<OUT> format;
        private TypeSerializer<OUT> serializer;
 
        private transient Object checkpointLock;
 
-       private transient SplitReader<S, OUT> reader;
+       private transient SplitReader<OUT> reader;
        private transient SourceFunction.SourceContext<OUT> readerContext;
-       private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
+       private List<TimestampedFileInputSplit> restoredReaderState;
 
        public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
                this.format = checkNotNull(format);
@@ -110,13 +106,13 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                        timeCharacteristic, getProcessingTimeService(), 
checkpointLock, output, watermarkInterval);
 
                // and initialize the split reading thread
-               this.reader = new SplitReader<>(format, serializer, 
readerContext, checkpointLock, readerState);
-               this.readerState = null;
+               this.reader = new SplitReader<>(format, serializer, 
readerContext, checkpointLock, restoredReaderState);
+               this.restoredReaderState = null;
                this.reader.start();
        }
 
        @Override
-       public void processElement(StreamRecord<FileInputSplit> element) throws 
Exception {
+       public void processElement(StreamRecord<TimestampedFileInputSplit> 
element) throws Exception {
                reader.addSplit(element.getValue());
        }
 
@@ -157,7 +153,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                }
                reader = null;
                readerContext = null;
-               readerState = null;
+               restoredReaderState = null;
                format = null;
                serializer = null;
        }
@@ -190,7 +186,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                output.close();
        }
 
-       private class SplitReader<S extends Serializable, OT> extends Thread {
+       private class SplitReader<OT> extends Thread {
 
                private volatile boolean isRunning;
 
@@ -200,44 +196,39 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                private final Object checkpointLock;
                private final SourceFunction.SourceContext<OT> readerContext;
 
-               private final Queue<FileInputSplit> pendingSplits;
-
-               private FileInputSplit currentSplit = null;
+               private final Queue<TimestampedFileInputSplit> pendingSplits;
 
-               private S restoredFormatState = null;
+               private TimestampedFileInputSplit currentSplit;
 
-               private volatile boolean isSplitOpen = false;
+               private volatile boolean isSplitOpen;
 
                private SplitReader(FileInputFormat<OT> format,
                                        TypeSerializer<OT> serializer,
                                        SourceFunction.SourceContext<OT> 
readerContext,
                                        Object checkpointLock,
-                                       Tuple3<List<FileInputSplit>, 
FileInputSplit, S> restoredState) {
+                                       List<TimestampedFileInputSplit> 
restoredState) {
 
                        this.format = checkNotNull(format, "Unspecified 
FileInputFormat.");
                        this.serializer = checkNotNull(serializer, "Unspecified 
Serializer.");
                        this.readerContext = checkNotNull(readerContext, 
"Unspecified Reader Context.");
                        this.checkpointLock = checkNotNull(checkpointLock, 
"Unspecified checkpoint lock.");
 
-                       this.pendingSplits = new ArrayDeque<>();
                        this.isRunning = true;
 
-                       // this is the case where a task recovers from a 
previous failed attempt
-                       if (restoredState != null) {
-                               List<FileInputSplit> pending = restoredState.f0;
-                               FileInputSplit current = restoredState.f1;
-                               S formatState = restoredState.f2;
-
-                               for (FileInputSplit split : pending) {
-                                       pendingSplits.add(split);
+                       this.pendingSplits = new PriorityQueue<>(10, new 
Comparator<TimestampedFileInputSplit>() {
+                               @Override
+                               public int compare(TimestampedFileInputSplit 
o1, TimestampedFileInputSplit o2) {
+                                       return o1.compareTo(o2);
                                }
+                       });
 
-                               this.currentSplit = current;
-                               this.restoredFormatState = formatState;
+                       // this is the case where a task recovers from a 
previous failed attempt
+                       if (restoredState != null) {
+                               this.pendingSplits.addAll(restoredState);
                        }
                }
 
-               private void addSplit(FileInputSplit split) {
+               private void addSplit(TimestampedFileInputSplit split) {
                        checkNotNull(split, "Cannot insert a null value in the 
pending splits queue.");
                        synchronized (checkpointLock) {
                                this.pendingSplits.add(split);
@@ -259,43 +250,32 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
 
                                        synchronized (checkpointLock) {
 
-                                               if (this.currentSplit != null) {
-
-                                                       if 
(currentSplit.equals(EOS)) {
-                                                               isRunning = 
false;
-                                                               break;
-                                                       }
-
-                                                       if (this.format 
instanceof CheckpointableInputFormat && restoredFormatState != null) {
-
-                                                               
@SuppressWarnings("unchecked")
-                                                               
CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
-                                                                               
(CheckpointableInputFormat<FileInputSplit, S>) this.format;
-
-                                                               
checkpointableFormat.reopen(currentSplit, restoredFormatState);
-                                                       } else {
-                                                               // this is the 
case of a non-checkpointable input format that will reprocess the last split.
-                                                               
LOG.info("Format " + this.format.getClass().getName() + " does not support 
checkpointing.");
-                                                               
format.open(currentSplit);
-                                                       }
-                                                       // reset the restored 
state to null for the next iteration
-                                                       
this.restoredFormatState = null;
-                                               } else {
-
-                                                       // get the next split 
to read.
+                                               if (currentSplit == null) {
                                                        currentSplit = 
this.pendingSplits.poll();
-
                                                        if (currentSplit == 
null) {
                                                                
checkpointLock.wait(50);
                                                                continue;
                                                        }
+                                               }
 
-                                                       if 
(currentSplit.equals(EOS)) {
-                                                               isRunning = 
false;
-                                                               break;
-                                                       }
+                                               if (currentSplit.equals(EOS)) {
+                                                       isRunning = false;
+                                                       break;
+                                               }
+
+                                               if (this.format instanceof 
CheckpointableInputFormat && currentSplit.getSplitState() != null) {
+                                                       // recovering after a 
node failure with an input
+                                                       // format that supports 
resetting the offset
+                                                       
((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) 
this.format).
+                                                               
reopen(currentSplit, currentSplit.getSplitState());
+                                               } else {
+                                                       // we either have a new 
split, or we recovered from a node
+                                                       // failure but the 
input format does not support resetting the offset.
                                                        
this.format.open(currentSplit);
                                                }
+
+                                               // reset the restored state to 
null for the next iteration
+                                               
this.currentSplit.resetSplitState();
                                                this.isSplitOpen = true;
                                        }
 
@@ -348,34 +328,17 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                        }
                }
 
-               private Tuple3<List<FileInputSplit>, FileInputSplit, S> 
getReaderState() throws IOException {
-                       List<FileInputSplit> snapshot = new 
ArrayList<>(this.pendingSplits.size());
-                       for (FileInputSplit split: this.pendingSplits) {
-                               snapshot.add(split);
-                       }
-
-                       // remove the current split from the list if inside.
-                       if (this.currentSplit != null && 
this.currentSplit.equals(pendingSplits.peek())) {
-                               this.pendingSplits.remove();
-                       }
-
-                       if (this.currentSplit != null) {
-                               if (this.format instanceof 
CheckpointableInputFormat) {
-                                       @SuppressWarnings("unchecked")
-                                       
CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
-                                                       
(CheckpointableInputFormat<FileInputSplit, S>) this.format;
-
-                                       S formatState = this.isSplitOpen ?
-                                                       
checkpointableFormat.getCurrentState() :
-                                                       restoredFormatState;
-                                       return new Tuple3<>(snapshot, 
currentSplit, formatState);
-                               } else {
-                                       LOG.info("The format does not support 
checkpointing. The current input split will be re-read from start upon 
recovery.");
-                                       return new Tuple3<>(snapshot, 
currentSplit, null);
+               private List<TimestampedFileInputSplit> getReaderState() throws 
IOException {
+                       List<TimestampedFileInputSplit> snapshot = new 
ArrayList<>(this.pendingSplits.size());
+                       if (currentSplit != null ) {
+                               if (this.format instanceof 
CheckpointableInputFormat && this.isSplitOpen) {
+                                       Serializable formatState = 
((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) 
this.format).getCurrentState();
+                                       
this.currentSplit.setSplitState(formatState);
                                }
-                       } else {
-                               return new Tuple3<>(snapshot, null, null);
+                               snapshot.add(this.currentSplit);
                        }
+                       snapshot.addAll(this.pendingSplits);
+                       return snapshot;
                }
 
                public void cancel() {
@@ -389,45 +352,27 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
        public void snapshotState(FSDataOutputStream os, long checkpointId, 
long timestamp) throws Exception {
                final ObjectOutputStream oos = new ObjectOutputStream(os);
 
-               Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState = 
this.reader.getReaderState();
-               List<FileInputSplit> pendingSplits = readerState.f0;
-               FileInputSplit currSplit = readerState.f1;
-               S formatState = readerState.f2;
-
-               // write the current split
-               oos.writeObject(currSplit);
-               oos.writeInt(pendingSplits.size());
-               for (FileInputSplit split : pendingSplits) {
+               List<TimestampedFileInputSplit> readerState = 
this.reader.getReaderState();
+               oos.writeInt(readerState.size());
+               for (TimestampedFileInputSplit split : readerState) {
                        oos.writeObject(split);
                }
-
-               // write the state of the reading channel
-               oos.writeObject(formatState);
                oos.flush();
        }
 
        @Override
        public void restoreState(FSDataInputStream is) throws Exception {
-               final ObjectInputStream ois = new ObjectInputStream(is);
 
-               // read the split that was being read
-               FileInputSplit currSplit = (FileInputSplit) ois.readObject();
+               checkState(this.restoredReaderState == null,
+                       "The reader state has already been initialized.");
+
+               final ObjectInputStream ois = new ObjectInputStream(is);
 
-               // read the pending splits list
-               List<FileInputSplit> pendingSplits = new ArrayList<>();
                int noOfSplits = ois.readInt();
+               List<TimestampedFileInputSplit> pendingSplits = new 
ArrayList<>(noOfSplits);
                for (int i = 0; i < noOfSplits; i++) {
-                       FileInputSplit split = (FileInputSplit) 
ois.readObject();
-                       pendingSplits.add(split);
+                       pendingSplits.add((TimestampedFileInputSplit) 
ois.readObject());
                }
-
-               // read the state of the format
-               @SuppressWarnings("unchecked")
-               S formatState = (S) ois.readObject();
-
-               // set the whole reader state for the open() to find.
-               checkState(this.readerState == null, "The reader state has 
already been initialized.");
-
-               this.readerState = new Tuple3<>(pendingSplits, currSplit, 
formatState);
+               this.restoredReaderState = pendingSplits;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
index cdbeb2b..f8c4fba 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
@@ -20,12 +20,15 @@ package org.apache.flink.streaming.api.functions.source;
 import org.apache.flink.annotation.PublicEvolving;
 
 /**
- * Specifies when the computation of the {@link 
ContinuousFileMonitoringFunction}
- * will be triggered.
+ * The mode in which the {@link ContinuousFileMonitoringFunction} operates.
+ * This can be either {@link #PROCESS_ONCE} or {@link #PROCESS_CONTINUOUSLY}.
  */
 @PublicEvolving
 public enum FileProcessingMode {
 
-       PROCESS_ONCE,                           // Processes the current 
content of a file/path only ONCE, and stops monitoring.
-       PROCESS_CONTINUOUSLY            // Reprocesses the whole file when new 
data is appended.
+       /** Processes the current contents of the path and exits. */
+       PROCESS_ONCE,
+
+       /** Periodically scans the path for new data. */
+       PROCESS_CONTINUOUSLY
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
new file mode 100644
index 0000000..323b3ab
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * <ul>
+ *     <li>The modification time of the file this split belongs to.</li>
+ *     <li>When checkpointing, the state of the split at the moment of the 
checkpoint.</li>
+ * </ul>
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and the
+ * {@link ContinuousFileReaderOperator} to perform continuous file processing.
+ * */
+public class TimestampedFileInputSplit extends FileInputSplit implements 
Comparable<TimestampedFileInputSplit>{
+
+       /** The modification time of the file this split belongs to. */
+       private final long modificationTime;
+
+       /**
+        * The state of the split. This information is used when
+        * restoring from a checkpoint and allows to resume reading the
+        * underlying file from the point we left off.
+        * */
+       private Serializable splitState;
+
+       /** A special {@link TimestampedFileInputSplit} signaling the end of 
the stream of splits.*/
+       public static final TimestampedFileInputSplit EOS =
+               new TimestampedFileInputSplit(Long.MIN_VALUE, -1, null, -1, -1, 
null);
+
+       /**
+        * Creates a {@link TimestampedFileInputSplit} based on the file 
modification time and
+        * the rest of the information of the {@link FileInputSplit}, as 
returned by the
+        * underlying filesystem.
+        *
+        * @param modificationTime the modification file of the file this split 
belongs to
+        * @param num    the number of this input split
+        * @param file   the file name
+        * @param start  the position of the first byte in the file to process
+        * @param length the number of bytes in the file to process (-1 is flag 
for "read whole file")
+        * @param hosts  the list of hosts containing the block, possibly 
{@code null}
+        */
+       public TimestampedFileInputSplit(long modificationTime, int num, Path 
file, long start, long length, String[] hosts) {
+               super(num, file, start, length, hosts);
+
+               Preconditions.checkArgument(modificationTime >= 0 || 
modificationTime == Long.MIN_VALUE,
+                       "Invalid File Split Modification Time: "+ 
modificationTime +".");
+
+               this.modificationTime = modificationTime;
+       }
+
+       /**
+        * Sets the state of the split. This information is used when
+        * restoring from a checkpoint and allows to resume reading the
+        * underlying file from the point we left off.
+        * <p>
+        * This is applicable to {@link 
org.apache.flink.api.common.io.FileInputFormat FileInputFormats}
+        * that implement the {@link 
org.apache.flink.api.common.io.CheckpointableInputFormat
+        * CheckpointableInputFormat} interface.
+        * */
+       public void setSplitState(Serializable state) {
+               this.splitState = state;
+       }
+
+       /**
+        * Sets the state of the split to {@code null}.
+        */
+       public void resetSplitState() {
+               this.setSplitState(null);
+       }
+
+       /** @return the state of the split. */
+       public Serializable getSplitState() {
+               return this.splitState;
+       }
+
+       /** @return The modification time of the file this split belongs to. */
+       public long getModificationTime() {
+               return this.modificationTime;
+       }
+
+       @Override
+       public int compareTo(TimestampedFileInputSplit o) {
+               long modTimeComp = this.modificationTime - o.modificationTime;
+               if (modTimeComp != 0L) {
+                       // we cannot just cast the modTimeComp to int
+                       // because it may overflow
+                       return modTimeComp > 0 ? 1 : -1;
+               }
+
+               int pathComp = this.getPath().compareTo(o.getPath());
+               return pathComp != 0 ? pathComp : this.getSplitNumber() - 
o.getSplitNumber();
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               } else if (o != null && o instanceof TimestampedFileInputSplit 
&& super.equals(o)) {
+                       TimestampedFileInputSplit that = 
(TimestampedFileInputSplit) o;
+                       return this.modificationTime == that.modificationTime;
+               }
+               return false;
+       }
+
+       @Override
+       public int hashCode() {
+               int res = 37 * (int)(this.modificationTime ^ 
(this.modificationTime >>> 32));
+               return 37 * res + super.hashCode();
+       }
+
+       @Override
+       public String toString() {
+               return "[" + getSplitNumber() + "] " + getPath() +" mod@ "+
+                       modificationTime + " : " + getStart() + " + " + 
getLength();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index a265c0a..0e9b054 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -57,18 +57,17 @@ import static org.junit.Assert.fail;
 
 public class ContinuousFileProcessingCheckpointITCase extends 
StreamFaultToleranceTestBase {
 
-       private static final int NO_OF_FILES = 9;
-       private static final int LINES_PER_FILE = 200;
+       private static final int NO_OF_FILES = 5;
+       private static final int LINES_PER_FILE = 150;
        private static final int NO_OF_RETRIES = 3;
-       private static final int PARALLELISM = 4;
-       private static final long INTERVAL = 2000;
+       private static final long INTERVAL = 100;
 
        private static File baseDir;
-       private static org.apache.hadoop.fs.FileSystem fs;
+       private static org.apache.hadoop.fs.FileSystem localFs;
        private static String localFsURI;
        private FileCreator fc;
 
-       private static  Map<Integer, List<String>> finalCollectedContent = new 
HashMap<>();
+       private static  Map<Integer, Set<String>> actualCollectedContent = new 
HashMap<>();
 
        @BeforeClass
        public static void createHDFS() {
@@ -79,7 +78,7 @@ public class ContinuousFileProcessingCheckpointITCase extends 
StreamFaultToleran
                        org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
 
                        localFsURI = "file:///" + baseDir +"/";
-                       fs = new 
org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
+                       localFs = new 
org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
 
                } catch(Throwable e) {
                        e.printStackTrace();
@@ -100,22 +99,22 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
        public void testProgram(StreamExecutionEnvironment env) {
 
                // set the restart strategy.
-               env.getConfig().setRestartStrategy(
-                       RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0));
-               env.enableCheckpointing(20);
-               env.setParallelism(PARALLELISM);
+               
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(NO_OF_RETRIES,
 0));
+               env.enableCheckpointing(10);
 
                // create and start the file creating thread.
                fc = new FileCreator();
                fc.start();
 
                // create the monitoring source along with the necessary 
readers.
-               TestingSinkFunction sink = new TestingSinkFunction();
                TextInputFormat format = new TextInputFormat(new 
org.apache.flink.core.fs.Path(localFsURI));
                format.setFilesFilter(FilePathFilter.createDefaultFilter());
+
                DataStream<String> inputStream = env.readFile(format, 
localFsURI,
                        FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL);
 
+               TestingSinkFunction sink = new TestingSinkFunction();
+
                inputStream.flatMap(new FlatMapFunction<String, String>() {
                        @Override
                        public void flatMap(String value, Collector<String> 
out) throws Exception {
@@ -126,12 +125,17 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
 
        @Override
        public void postSubmit() throws Exception {
-               Map<Integer, List<String>> collected = finalCollectedContent;
+
+               // be sure that the file creating thread is done.
+               fc.join();
+
+               Map<Integer, Set<String>> collected = actualCollectedContent;
                Assert.assertEquals(collected.size(), 
fc.getFileContent().size());
+
                for (Integer fileIdx: fc.getFileContent().keySet()) {
                        Assert.assertTrue(collected.keySet().contains(fileIdx));
 
-                       List<String> cntnt = collected.get(fileIdx);
+                       List<String> cntnt = new 
ArrayList<>(collected.get(fileIdx));
                        Collections.sort(cntnt, new Comparator<String>() {
                                @Override
                                public int compare(String o1, String o2) {
@@ -147,105 +151,34 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
                }
 
                collected.clear();
-               finalCollectedContent.clear();
+               actualCollectedContent.clear();
                fc.clean();
        }
 
        private int getLineNo(String line) {
                String[] tkns = line.split("\\s");
-               Assert.assertTrue(tkns.length == 6);
                return Integer.parseInt(tkns[tkns.length - 1]);
        }
 
-       // 
--------------------------------------------------------------------------------------------
-       //  Custom Functions
-       // 
--------------------------------------------------------------------------------------------
-
-       // -------------------------                    FILE CREATION           
        -------------------------------
-
-       /**
-        * A separate thread creating {@link #NO_OF_FILES} files, one file 
every {@link #INTERVAL} milliseconds.
-        * It serves for testing the file monitoring functionality of the 
{@link ContinuousFileMonitoringFunction}.
-        * The files are filled with data by the {@link #fillWithData(String, 
String, int, String)} method.
-        * */
-       private class FileCreator extends Thread {
-
-               private final Set<Path> filesCreated = new HashSet<>();
-               private final Map<Integer, String> fileContents = new 
HashMap<>();
-
-               public void run() {
-                       try {
-                               for(int i = 0; i < NO_OF_FILES; i++) {
-                                       Tuple2<org.apache.hadoop.fs.Path, 
String> file =
-                                               fillWithData(localFsURI, 
"file", i, "This is test line.");
-                                       filesCreated.add(file.f0);
-                                       fileContents.put(i, file.f1);
-
-                                       Thread.sleep((int) (INTERVAL / 
(3.0/2)));
-                               }
-                       } catch (IOException | InterruptedException e) {
-                               e.printStackTrace();
-                       }
-               }
-
-               void clean() throws IOException {
-                       assert (fs != null);
-                       for (org.apache.hadoop.fs.Path path: filesCreated) {
-                               fs.delete(path, false);
-                       }
-                       fileContents.clear();
-               }
-
-               Map<Integer, String> getFileContent() {
-                       return this.fileContents;
-               }
-       }
-
-       /**
-        * Fill the file with content and put the content in the {@code 
hdPathContents} list.
-        * */
-       private Tuple2<Path, String> fillWithData(
-               String base, String fileName, int fileIdx, String sampleLine) 
throws IOException {
-
-               assert (fs != null);
-
-               org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
-
-               org.apache.hadoop.fs.Path tmp = new 
org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
-               FSDataOutputStream stream = fs.create(tmp);
-               StringBuilder str = new StringBuilder();
-               for(int i = 0; i < LINES_PER_FILE; i++) {
-                       String line = fileIdx +": "+ sampleLine + " " + i +"\n";
-                       str.append(line);
-                       stream.write(line.getBytes());
-               }
-               stream.close();
-
-               Assert.assertTrue("Result file present", !fs.exists(file));
-               fs.rename(tmp, file);
-               Assert.assertTrue("No result file present", fs.exists(file));
-               return new Tuple2<>(file, str.toString());
-       }
-
        // --------------------------                   Task Sink               
        ------------------------------
 
        private static class TestingSinkFunction extends 
RichSinkFunction<String>
                implements Checkpointed<Tuple2<Long, Map<Integer, 
Set<String>>>>, CheckpointListener {
 
-               private static volatile boolean hasFailed = false;
+               private boolean hasFailed;
 
-               private volatile int numSuccessfulCheckpoints;
-
-               private long count;
+               private volatile boolean hasSuccessfulCheckpoints;
 
                private long elementsToFailure;
 
-               private long elementCounter = 0;
+               private long elementCounter;
 
-               private  Map<Integer, Set<String>> collectedContent = new 
HashMap<>();
+               private Map<Integer, Set<String>> actualContent = new 
HashMap<>();
 
                TestingSinkFunction() {
                        hasFailed = false;
+                       elementCounter = 0;
+                       hasSuccessfulCheckpoints = false;
                }
 
                @Override
@@ -257,74 +190,157 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
                        long failurePosMax = (long) (0.7 * LINES_PER_FILE);
 
                        elementsToFailure = (new Random().nextLong() % 
(failurePosMax - failurePosMin)) + failurePosMin;
-
-                       if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
-                               finalCollectedContent = new HashMap<>();
-                               for (Map.Entry<Integer, Set<String>> result: 
collectedContent.entrySet()) {
-                                       
finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
-                               }
-                               throw new SuccessException();
-                       }
-               }
-
-               @Override
-               public void close() {
-                       try {
-                               super.close();
-                       } catch (Exception e) {
-                               e.printStackTrace();
-                       }
                }
 
                @Override
                public void invoke(String value) throws Exception {
-                       int fileIdx = 
Character.getNumericValue(value.charAt(0));
+                       int fileIdx = getFileIdx(value);
 
-                       Set<String> content = collectedContent.get(fileIdx);
+                       Set<String> content = actualContent.get(fileIdx);
                        if (content == null) {
                                content = new HashSet<>();
-                               collectedContent.put(fileIdx, content);
+                               actualContent.put(fileIdx, content);
                        }
 
+                       // detect duplicate lines.
                        if (!content.add(value + "\n")) {
                                fail("Duplicate line: " + value);
                                System.exit(0);
                        }
 
-
                        elementCounter++;
+
+                       // this is termination
                        if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
-                               finalCollectedContent = new HashMap<>();
-                               for (Map.Entry<Integer, Set<String>> result: 
collectedContent.entrySet()) {
-                                       
finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
-                               }
+                               actualCollectedContent = actualContent;
                                throw new SuccessException();
                        }
 
-                       count++;
-                       if (!hasFailed) {
-                               Thread.sleep(2);
-                               if (numSuccessfulCheckpoints >= 1 && count >= 
elementsToFailure) {
-                                       hasFailed = true;
-                                       throw new Exception("Task Failure");
-                               }
+                       // add some latency so that we have at least one 
checkpoint in
+                       if (!hasFailed && !hasSuccessfulCheckpoints) {
+                               Thread.sleep(5);
+                       }
+
+                       // simulate a node failure
+                       if (!hasFailed && hasSuccessfulCheckpoints && 
elementCounter >= elementsToFailure) {
+                               throw new Exception("Task Failure @ elem: " + 
elementCounter + " / " + elementsToFailure);
+                       }
+               }
+
+               @Override
+               public void close() {
+                       try {
+                               super.close();
+                       } catch (Exception e) {
+                               e.printStackTrace();
                        }
                }
 
                @Override
                public Tuple2<Long, Map<Integer, Set<String>>> 
snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-                       return new Tuple2<>(elementCounter, collectedContent);
+                       return new Tuple2<>(elementCounter, actualContent);
                }
 
                @Override
                public void restoreState(Tuple2<Long, Map<Integer, 
Set<String>>> state) throws Exception {
+                       this.hasFailed = true;
                        this.elementCounter = state.f0;
-                       this.collectedContent = state.f1;
+                       this.actualContent = state.f1;
                }
 
                @Override
                public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-                       numSuccessfulCheckpoints++;
+                       hasSuccessfulCheckpoints = true;
                }
+
+               private int getFileIdx(String line) {
+                       String[] tkns = line.split(":");
+                       return Integer.parseInt(tkns[0]);
+               }
+       }
+
+       // -------------------------                    FILE CREATION           
        -------------------------------
+
+       /**
+        * A separate thread creating {@link #NO_OF_FILES} files, one file 
every {@link #INTERVAL} milliseconds.
+        * It serves for testing the file monitoring functionality of the 
{@link ContinuousFileMonitoringFunction}.
+        * The files are filled with data by the {@link #fillWithData(String, 
String, int, String)} method.
+        * */
+       private class FileCreator extends Thread {
+
+               private final Set<Path> filesCreated = new HashSet<>();
+               private final Map<Integer, String> fileContents = new 
HashMap<>();
+
+               /** The modification time of the last created file. */
+               private long lastCreatedModTime = Long.MIN_VALUE;
+
+               public void run() {
+                       try {
+                               for(int i = 0; i < NO_OF_FILES; i++) {
+                                       Tuple2<org.apache.hadoop.fs.Path, 
String> tmpFile;
+                                       long modTime;
+                                       do {
+
+                                               // give it some time so that 
the files have
+                                               // different modification 
timestamps.
+                                               Thread.sleep(50);
+
+                                               tmpFile = 
fillWithData(localFsURI, "file", i, "This is test line.");
+
+                                               modTime = 
localFs.getFileStatus(tmpFile.f0).getModificationTime();
+                                               if (modTime <= 
lastCreatedModTime) {
+                                                       // delete the last 
created file to recreate it with a different timestamp
+                                                       
localFs.delete(tmpFile.f0, false);
+                                               }
+                                       } while (modTime <= lastCreatedModTime);
+                                       lastCreatedModTime = modTime;
+
+                                       // rename the file
+                                       org.apache.hadoop.fs.Path file =
+                                               new 
org.apache.hadoop.fs.Path(localFsURI + "/file" + i);
+                                       localFs.rename(tmpFile.f0, file);
+                                       Assert.assertTrue(localFs.exists(file));
+
+                                       filesCreated.add(file);
+                                       fileContents.put(i, tmpFile.f1);
+                               }
+                       } catch (IOException | InterruptedException e) {
+                               e.printStackTrace();
+                       }
+               }
+
+               void clean() throws IOException {
+                       assert (localFs != null);
+                       for (org.apache.hadoop.fs.Path path: filesCreated) {
+                               localFs.delete(path, false);
+                       }
+                       fileContents.clear();
+               }
+
+               Map<Integer, String> getFileContent() {
+                       return this.fileContents;
+               }
+       }
+
+       /**
+        * Fill the file with content and put the content in the {@code 
hdPathContents} list.
+        * */
+       private Tuple2<Path, String> fillWithData(
+               String base, String fileName, int fileIdx, String sampleLine) 
throws IOException, InterruptedException {
+
+               assert (localFs != null);
+
+               org.apache.hadoop.fs.Path tmp =
+                       new org.apache.hadoop.fs.Path(base + "/." + fileName + 
fileIdx);
+
+               FSDataOutputStream stream = localFs.create(tmp);
+               StringBuilder str = new StringBuilder();
+               for(int i = 0; i < LINES_PER_FILE; i++) {
+                       String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+                       str.append(line);
+                       stream.write(line.getBytes());
+               }
+               stream.close();
+               return new Tuple2<>(tmp, str.toString());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
new file mode 100644
index 0000000..88bd822
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TimestampedFileInputSplitTest {
+
+       @Test
+       public void testSplitEquality() {
+
+               TimestampedFileInputSplit eos1 = TimestampedFileInputSplit.EOS;
+               TimestampedFileInputSplit eos2 = TimestampedFileInputSplit.EOS;
+
+               Assert.assertEquals(eos1, eos2);
+
+               TimestampedFileInputSplit richFirstSplit =
+                       new TimestampedFileInputSplit(10, 2, new Path("test"), 
0, 100, null);
+               Assert.assertNotEquals(eos1, richFirstSplit);
+
+               TimestampedFileInputSplit richSecondSplit =
+                       new TimestampedFileInputSplit(10, 2, new Path("test"), 
0, 100, null);
+               Assert.assertEquals(richFirstSplit, richSecondSplit);
+
+               TimestampedFileInputSplit richModSecondSplit =
+                       new TimestampedFileInputSplit(11, 2, new Path("test"), 
0, 100, null);
+               Assert.assertNotEquals(richSecondSplit, richModSecondSplit);
+
+               TimestampedFileInputSplit richThirdSplit =
+                       new TimestampedFileInputSplit(10, 2, new 
Path("test/test1"), 0, 100, null);
+               Assert.assertEquals(richThirdSplit.getModificationTime(), 10);
+               Assert.assertNotEquals(richFirstSplit, richThirdSplit);
+
+               TimestampedFileInputSplit richThirdSplitCopy =
+                       new TimestampedFileInputSplit(10, 2, new 
Path("test/test1"), 0, 100, null);
+               Assert.assertEquals(richThirdSplitCopy, richThirdSplit);
+       }
+
+       @Test
+       public void testSplitComparison() {
+               TimestampedFileInputSplit richFirstSplit =
+                       new TimestampedFileInputSplit(10, 3, new 
Path("test/test1"), 0, 100, null);
+
+               TimestampedFileInputSplit richSecondSplit =
+                       new TimestampedFileInputSplit(10, 2, new 
Path("test/test2"), 0, 100, null);
+
+               TimestampedFileInputSplit richThirdSplit =
+                       new TimestampedFileInputSplit(10, 1, new 
Path("test/test2"), 0, 100, null);
+
+               TimestampedFileInputSplit richForthSplit =
+                       new TimestampedFileInputSplit(11, 0, new 
Path("test/test3"), 0, 100, null);
+
+               // lexicographically on the path order
+               Assert.assertTrue(richFirstSplit.compareTo(richSecondSplit) < 
0);
+               Assert.assertTrue(richFirstSplit.compareTo(richThirdSplit) < 0);
+
+               // same mod time, same file so smaller split number first
+               Assert.assertTrue(richThirdSplit.compareTo(richSecondSplit) < 
0);
+
+               // smaller modification time first
+               Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0);
+       }
+
+       @Test
+       public void testIllegalArgument() {
+               try {
+                       new TimestampedFileInputSplit(-10, 2, new Path("test"), 
0, 100, null); // invalid modification time
+               } catch (Exception e) {
+                       if (!(e instanceof IllegalArgumentException)) {
+                               Assert.fail(e.getMessage());
+                       }
+               }
+       }
+}

Reply via email to