http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 2f0a16a..c8e9846 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -22,11 +22,9 @@ import org.apache.flink.api.common.io.CheckpointableInputFormat; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -43,44 +41,42 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; -import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; +import java.util.PriorityQueue; import java.util.Queue; +import static org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit.EOS; import static org.apache.flink.util.Preconditions.checkState; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * This is the operator that reads the {@link FileInputSplit FileInputSplits} received from - * the preceding {@link ContinuousFileMonitoringFunction}. This operator can have parallelism - * greater than 1, contrary to the {@link ContinuousFileMonitoringFunction} which has - * a parallelism of 1. + * The operator that reads the {@link TimestampedFileInputSplit splits} received from the preceding + * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link ContinuousFileMonitoringFunction} + * which has a parallelism of 1, this operator can have DOP > 1. * <p/> - * This operator will receive the split descriptors, put them in a queue, and have another - * thread read the actual data from the split. This architecture allows the separation of the - * reading thread, from the one emitting the checkpoint barriers, thus removing any potential + * As soon as a split descriptor is received, it is put in a queue, and have another + * thread read the actual data of the split. This architecture allows the separation of the + * reading thread from the one emitting the checkpoint barriers, thus removing any potential * back-pressure. */ @Internal -public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT> - implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT>, StreamCheckpointedOperator { +public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT> + implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT>, StreamCheckpointedOperator { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class); - /** A value that serves as a kill-pill to stop the reading thread when no more splits remain. */ - private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null); - private FileInputFormat<OUT> format; private TypeSerializer<OUT> serializer; private transient Object checkpointLock; - private transient SplitReader<S, OUT> reader; + private transient SplitReader<OUT> reader; private transient SourceFunction.SourceContext<OUT> readerContext; - private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState; + private List<TimestampedFileInputSplit> restoredReaderState; public ContinuousFileReaderOperator(FileInputFormat<OUT> format) { this.format = checkNotNull(format); @@ -110,13 +106,13 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A timeCharacteristic, getProcessingTimeService(), checkpointLock, output, watermarkInterval); // and initialize the split reading thread - this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, readerState); - this.readerState = null; + this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, restoredReaderState); + this.restoredReaderState = null; this.reader.start(); } @Override - public void processElement(StreamRecord<FileInputSplit> element) throws Exception { + public void processElement(StreamRecord<TimestampedFileInputSplit> element) throws Exception { reader.addSplit(element.getValue()); } @@ -157,7 +153,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } reader = null; readerContext = null; - readerState = null; + restoredReaderState = null; format = null; serializer = null; } @@ -190,7 +186,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A output.close(); } - private class SplitReader<S extends Serializable, OT> extends Thread { + private class SplitReader<OT> extends Thread { private volatile boolean isRunning; @@ -200,44 +196,39 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A private final Object checkpointLock; private final SourceFunction.SourceContext<OT> readerContext; - private final Queue<FileInputSplit> pendingSplits; - - private FileInputSplit currentSplit = null; + private final Queue<TimestampedFileInputSplit> pendingSplits; - private S restoredFormatState = null; + private TimestampedFileInputSplit currentSplit; - private volatile boolean isSplitOpen = false; + private volatile boolean isSplitOpen; private SplitReader(FileInputFormat<OT> format, TypeSerializer<OT> serializer, SourceFunction.SourceContext<OT> readerContext, Object checkpointLock, - Tuple3<List<FileInputSplit>, FileInputSplit, S> restoredState) { + List<TimestampedFileInputSplit> restoredState) { this.format = checkNotNull(format, "Unspecified FileInputFormat."); this.serializer = checkNotNull(serializer, "Unspecified Serializer."); this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context."); this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock."); - this.pendingSplits = new ArrayDeque<>(); this.isRunning = true; - // this is the case where a task recovers from a previous failed attempt - if (restoredState != null) { - List<FileInputSplit> pending = restoredState.f0; - FileInputSplit current = restoredState.f1; - S formatState = restoredState.f2; - - for (FileInputSplit split : pending) { - pendingSplits.add(split); + this.pendingSplits = new PriorityQueue<>(10, new Comparator<TimestampedFileInputSplit>() { + @Override + public int compare(TimestampedFileInputSplit o1, TimestampedFileInputSplit o2) { + return o1.compareTo(o2); } + }); - this.currentSplit = current; - this.restoredFormatState = formatState; + // this is the case where a task recovers from a previous failed attempt + if (restoredState != null) { + this.pendingSplits.addAll(restoredState); } } - private void addSplit(FileInputSplit split) { + private void addSplit(TimestampedFileInputSplit split) { checkNotNull(split, "Cannot insert a null value in the pending splits queue."); synchronized (checkpointLock) { this.pendingSplits.add(split); @@ -259,43 +250,32 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A synchronized (checkpointLock) { - if (this.currentSplit != null) { - - if (currentSplit.equals(EOS)) { - isRunning = false; - break; - } - - if (this.format instanceof CheckpointableInputFormat && restoredFormatState != null) { - - @SuppressWarnings("unchecked") - CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat = - (CheckpointableInputFormat<FileInputSplit, S>) this.format; - - checkpointableFormat.reopen(currentSplit, restoredFormatState); - } else { - // this is the case of a non-checkpointable input format that will reprocess the last split. - LOG.info("Format " + this.format.getClass().getName() + " does not support checkpointing."); - format.open(currentSplit); - } - // reset the restored state to null for the next iteration - this.restoredFormatState = null; - } else { - - // get the next split to read. + if (currentSplit == null) { currentSplit = this.pendingSplits.poll(); - if (currentSplit == null) { checkpointLock.wait(50); continue; } + } - if (currentSplit.equals(EOS)) { - isRunning = false; - break; - } + if (currentSplit.equals(EOS)) { + isRunning = false; + break; + } + + if (this.format instanceof CheckpointableInputFormat && currentSplit.getSplitState() != null) { + // recovering after a node failure with an input + // format that supports resetting the offset + ((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) this.format). + reopen(currentSplit, currentSplit.getSplitState()); + } else { + // we either have a new split, or we recovered from a node + // failure but the input format does not support resetting the offset. this.format.open(currentSplit); } + + // reset the restored state to null for the next iteration + this.currentSplit.resetSplitState(); this.isSplitOpen = true; } @@ -348,34 +328,17 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } } - private Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException { - List<FileInputSplit> snapshot = new ArrayList<>(this.pendingSplits.size()); - for (FileInputSplit split: this.pendingSplits) { - snapshot.add(split); - } - - // remove the current split from the list if inside. - if (this.currentSplit != null && this.currentSplit.equals(pendingSplits.peek())) { - this.pendingSplits.remove(); - } - - if (this.currentSplit != null) { - if (this.format instanceof CheckpointableInputFormat) { - @SuppressWarnings("unchecked") - CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat = - (CheckpointableInputFormat<FileInputSplit, S>) this.format; - - S formatState = this.isSplitOpen ? - checkpointableFormat.getCurrentState() : - restoredFormatState; - return new Tuple3<>(snapshot, currentSplit, formatState); - } else { - LOG.info("The format does not support checkpointing. The current input split will be re-read from start upon recovery."); - return new Tuple3<>(snapshot, currentSplit, null); + private List<TimestampedFileInputSplit> getReaderState() throws IOException { + List<TimestampedFileInputSplit> snapshot = new ArrayList<>(this.pendingSplits.size()); + if (currentSplit != null ) { + if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) { + Serializable formatState = ((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) this.format).getCurrentState(); + this.currentSplit.setSplitState(formatState); } - } else { - return new Tuple3<>(snapshot, null, null); + snapshot.add(this.currentSplit); } + snapshot.addAll(this.pendingSplits); + return snapshot; } public void cancel() { @@ -389,45 +352,27 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A public void snapshotState(FSDataOutputStream os, long checkpointId, long timestamp) throws Exception { final ObjectOutputStream oos = new ObjectOutputStream(os); - Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState = this.reader.getReaderState(); - List<FileInputSplit> pendingSplits = readerState.f0; - FileInputSplit currSplit = readerState.f1; - S formatState = readerState.f2; - - // write the current split - oos.writeObject(currSplit); - oos.writeInt(pendingSplits.size()); - for (FileInputSplit split : pendingSplits) { + List<TimestampedFileInputSplit> readerState = this.reader.getReaderState(); + oos.writeInt(readerState.size()); + for (TimestampedFileInputSplit split : readerState) { oos.writeObject(split); } - - // write the state of the reading channel - oos.writeObject(formatState); oos.flush(); } @Override public void restoreState(FSDataInputStream is) throws Exception { - final ObjectInputStream ois = new ObjectInputStream(is); - // read the split that was being read - FileInputSplit currSplit = (FileInputSplit) ois.readObject(); + checkState(this.restoredReaderState == null, + "The reader state has already been initialized."); + + final ObjectInputStream ois = new ObjectInputStream(is); - // read the pending splits list - List<FileInputSplit> pendingSplits = new ArrayList<>(); int noOfSplits = ois.readInt(); + List<TimestampedFileInputSplit> pendingSplits = new ArrayList<>(noOfSplits); for (int i = 0; i < noOfSplits; i++) { - FileInputSplit split = (FileInputSplit) ois.readObject(); - pendingSplits.add(split); + pendingSplits.add((TimestampedFileInputSplit) ois.readObject()); } - - // read the state of the format - @SuppressWarnings("unchecked") - S formatState = (S) ois.readObject(); - - // set the whole reader state for the open() to find. - checkState(this.readerState == null, "The reader state has already been initialized."); - - this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState); + this.restoredReaderState = pendingSplits; } }
http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java index cdbeb2b..f8c4fba 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java @@ -20,12 +20,15 @@ package org.apache.flink.streaming.api.functions.source; import org.apache.flink.annotation.PublicEvolving; /** - * Specifies when the computation of the {@link ContinuousFileMonitoringFunction} - * will be triggered. + * The mode in which the {@link ContinuousFileMonitoringFunction} operates. + * This can be either {@link #PROCESS_ONCE} or {@link #PROCESS_CONTINUOUSLY}. */ @PublicEvolving public enum FileProcessingMode { - PROCESS_ONCE, // Processes the current content of a file/path only ONCE, and stops monitoring. - PROCESS_CONTINUOUSLY // Reprocesses the whole file when new data is appended. + /** Processes the current contents of the path and exits. */ + PROCESS_ONCE, + + /** Periodically scans the path for new data. */ + PROCESS_CONTINUOUSLY } http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java new file mode 100644 index 0000000..323b3ab --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * An extended {@link FileInputSplit} that also includes information about: + * <ul> + * <li>The modification time of the file this split belongs to.</li> + * <li>When checkpointing, the state of the split at the moment of the checkpoint.</li> + * </ul> + * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * {@link ContinuousFileReaderOperator} to perform continuous file processing. + * */ +public class TimestampedFileInputSplit extends FileInputSplit implements Comparable<TimestampedFileInputSplit>{ + + /** The modification time of the file this split belongs to. */ + private final long modificationTime; + + /** + * The state of the split. This information is used when + * restoring from a checkpoint and allows to resume reading the + * underlying file from the point we left off. + * */ + private Serializable splitState; + + /** A special {@link TimestampedFileInputSplit} signaling the end of the stream of splits.*/ + public static final TimestampedFileInputSplit EOS = + new TimestampedFileInputSplit(Long.MIN_VALUE, -1, null, -1, -1, null); + + /** + * Creates a {@link TimestampedFileInputSplit} based on the file modification time and + * the rest of the information of the {@link FileInputSplit}, as returned by the + * underlying filesystem. + * + * @param modificationTime the modification file of the file this split belongs to + * @param num the number of this input split + * @param file the file name + * @param start the position of the first byte in the file to process + * @param length the number of bytes in the file to process (-1 is flag for "read whole file") + * @param hosts the list of hosts containing the block, possibly {@code null} + */ + public TimestampedFileInputSplit(long modificationTime, int num, Path file, long start, long length, String[] hosts) { + super(num, file, start, length, hosts); + + Preconditions.checkArgument(modificationTime >= 0 || modificationTime == Long.MIN_VALUE, + "Invalid File Split Modification Time: "+ modificationTime +"."); + + this.modificationTime = modificationTime; + } + + /** + * Sets the state of the split. This information is used when + * restoring from a checkpoint and allows to resume reading the + * underlying file from the point we left off. + * <p> + * This is applicable to {@link org.apache.flink.api.common.io.FileInputFormat FileInputFormats} + * that implement the {@link org.apache.flink.api.common.io.CheckpointableInputFormat + * CheckpointableInputFormat} interface. + * */ + public void setSplitState(Serializable state) { + this.splitState = state; + } + + /** + * Sets the state of the split to {@code null}. + */ + public void resetSplitState() { + this.setSplitState(null); + } + + /** @return the state of the split. */ + public Serializable getSplitState() { + return this.splitState; + } + + /** @return The modification time of the file this split belongs to. */ + public long getModificationTime() { + return this.modificationTime; + } + + @Override + public int compareTo(TimestampedFileInputSplit o) { + long modTimeComp = this.modificationTime - o.modificationTime; + if (modTimeComp != 0L) { + // we cannot just cast the modTimeComp to int + // because it may overflow + return modTimeComp > 0 ? 1 : -1; + } + + int pathComp = this.getPath().compareTo(o.getPath()); + return pathComp != 0 ? pathComp : this.getSplitNumber() - o.getSplitNumber(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (o != null && o instanceof TimestampedFileInputSplit && super.equals(o)) { + TimestampedFileInputSplit that = (TimestampedFileInputSplit) o; + return this.modificationTime == that.modificationTime; + } + return false; + } + + @Override + public int hashCode() { + int res = 37 * (int)(this.modificationTime ^ (this.modificationTime >>> 32)); + return 37 * res + super.hashCode(); + } + + @Override + public String toString() { + return "[" + getSplitNumber() + "] " + getPath() +" mod@ "+ + modificationTime + " : " + getStart() + " + " + getLength(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java index a265c0a..0e9b054 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java @@ -57,18 +57,17 @@ import static org.junit.Assert.fail; public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleranceTestBase { - private static final int NO_OF_FILES = 9; - private static final int LINES_PER_FILE = 200; + private static final int NO_OF_FILES = 5; + private static final int LINES_PER_FILE = 150; private static final int NO_OF_RETRIES = 3; - private static final int PARALLELISM = 4; - private static final long INTERVAL = 2000; + private static final long INTERVAL = 100; private static File baseDir; - private static org.apache.hadoop.fs.FileSystem fs; + private static org.apache.hadoop.fs.FileSystem localFs; private static String localFsURI; private FileCreator fc; - private static Map<Integer, List<String>> finalCollectedContent = new HashMap<>(); + private static Map<Integer, Set<String>> actualCollectedContent = new HashMap<>(); @BeforeClass public static void createHDFS() { @@ -79,7 +78,7 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration(); localFsURI = "file:///" + baseDir +"/"; - fs = new org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf); + localFs = new org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf); } catch(Throwable e) { e.printStackTrace(); @@ -100,22 +99,22 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran public void testProgram(StreamExecutionEnvironment env) { // set the restart strategy. - env.getConfig().setRestartStrategy( - RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0)); - env.enableCheckpointing(20); - env.setParallelism(PARALLELISM); + env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0)); + env.enableCheckpointing(10); // create and start the file creating thread. fc = new FileCreator(); fc.start(); // create the monitoring source along with the necessary readers. - TestingSinkFunction sink = new TestingSinkFunction(); TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI)); format.setFilesFilter(FilePathFilter.createDefaultFilter()); + DataStream<String> inputStream = env.readFile(format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL); + TestingSinkFunction sink = new TestingSinkFunction(); + inputStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { @@ -126,12 +125,17 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran @Override public void postSubmit() throws Exception { - Map<Integer, List<String>> collected = finalCollectedContent; + + // be sure that the file creating thread is done. + fc.join(); + + Map<Integer, Set<String>> collected = actualCollectedContent; Assert.assertEquals(collected.size(), fc.getFileContent().size()); + for (Integer fileIdx: fc.getFileContent().keySet()) { Assert.assertTrue(collected.keySet().contains(fileIdx)); - List<String> cntnt = collected.get(fileIdx); + List<String> cntnt = new ArrayList<>(collected.get(fileIdx)); Collections.sort(cntnt, new Comparator<String>() { @Override public int compare(String o1, String o2) { @@ -147,105 +151,34 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran } collected.clear(); - finalCollectedContent.clear(); + actualCollectedContent.clear(); fc.clean(); } private int getLineNo(String line) { String[] tkns = line.split("\\s"); - Assert.assertTrue(tkns.length == 6); return Integer.parseInt(tkns[tkns.length - 1]); } - // -------------------------------------------------------------------------------------------- - // Custom Functions - // -------------------------------------------------------------------------------------------- - - // ------------------------- FILE CREATION ------------------------------- - - /** - * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds. - * It serves for testing the file monitoring functionality of the {@link ContinuousFileMonitoringFunction}. - * The files are filled with data by the {@link #fillWithData(String, String, int, String)} method. - * */ - private class FileCreator extends Thread { - - private final Set<Path> filesCreated = new HashSet<>(); - private final Map<Integer, String> fileContents = new HashMap<>(); - - public void run() { - try { - for(int i = 0; i < NO_OF_FILES; i++) { - Tuple2<org.apache.hadoop.fs.Path, String> file = - fillWithData(localFsURI, "file", i, "This is test line."); - filesCreated.add(file.f0); - fileContents.put(i, file.f1); - - Thread.sleep((int) (INTERVAL / (3.0/2))); - } - } catch (IOException | InterruptedException e) { - e.printStackTrace(); - } - } - - void clean() throws IOException { - assert (fs != null); - for (org.apache.hadoop.fs.Path path: filesCreated) { - fs.delete(path, false); - } - fileContents.clear(); - } - - Map<Integer, String> getFileContent() { - return this.fileContents; - } - } - - /** - * Fill the file with content and put the content in the {@code hdPathContents} list. - * */ - private Tuple2<Path, String> fillWithData( - String base, String fileName, int fileIdx, String sampleLine) throws IOException { - - assert (fs != null); - - org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx); - - org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx); - FSDataOutputStream stream = fs.create(tmp); - StringBuilder str = new StringBuilder(); - for(int i = 0; i < LINES_PER_FILE; i++) { - String line = fileIdx +": "+ sampleLine + " " + i +"\n"; - str.append(line); - stream.write(line.getBytes()); - } - stream.close(); - - Assert.assertTrue("Result file present", !fs.exists(file)); - fs.rename(tmp, file); - Assert.assertTrue("No result file present", fs.exists(file)); - return new Tuple2<>(file, str.toString()); - } - // -------------------------- Task Sink ------------------------------ private static class TestingSinkFunction extends RichSinkFunction<String> implements Checkpointed<Tuple2<Long, Map<Integer, Set<String>>>>, CheckpointListener { - private static volatile boolean hasFailed = false; + private boolean hasFailed; - private volatile int numSuccessfulCheckpoints; - - private long count; + private volatile boolean hasSuccessfulCheckpoints; private long elementsToFailure; - private long elementCounter = 0; + private long elementCounter; - private Map<Integer, Set<String>> collectedContent = new HashMap<>(); + private Map<Integer, Set<String>> actualContent = new HashMap<>(); TestingSinkFunction() { hasFailed = false; + elementCounter = 0; + hasSuccessfulCheckpoints = false; } @Override @@ -257,74 +190,157 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran long failurePosMax = (long) (0.7 * LINES_PER_FILE); elementsToFailure = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; - - if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) { - finalCollectedContent = new HashMap<>(); - for (Map.Entry<Integer, Set<String>> result: collectedContent.entrySet()) { - finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue())); - } - throw new SuccessException(); - } - } - - @Override - public void close() { - try { - super.close(); - } catch (Exception e) { - e.printStackTrace(); - } } @Override public void invoke(String value) throws Exception { - int fileIdx = Character.getNumericValue(value.charAt(0)); + int fileIdx = getFileIdx(value); - Set<String> content = collectedContent.get(fileIdx); + Set<String> content = actualContent.get(fileIdx); if (content == null) { content = new HashSet<>(); - collectedContent.put(fileIdx, content); + actualContent.put(fileIdx, content); } + // detect duplicate lines. if (!content.add(value + "\n")) { fail("Duplicate line: " + value); System.exit(0); } - elementCounter++; + + // this is termination if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) { - finalCollectedContent = new HashMap<>(); - for (Map.Entry<Integer, Set<String>> result: collectedContent.entrySet()) { - finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue())); - } + actualCollectedContent = actualContent; throw new SuccessException(); } - count++; - if (!hasFailed) { - Thread.sleep(2); - if (numSuccessfulCheckpoints >= 1 && count >= elementsToFailure) { - hasFailed = true; - throw new Exception("Task Failure"); - } + // add some latency so that we have at least one checkpoint in + if (!hasFailed && !hasSuccessfulCheckpoints) { + Thread.sleep(5); + } + + // simulate a node failure + if (!hasFailed && hasSuccessfulCheckpoints && elementCounter >= elementsToFailure) { + throw new Exception("Task Failure @ elem: " + elementCounter + " / " + elementsToFailure); + } + } + + @Override + public void close() { + try { + super.close(); + } catch (Exception e) { + e.printStackTrace(); } } @Override public Tuple2<Long, Map<Integer, Set<String>>> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return new Tuple2<>(elementCounter, collectedContent); + return new Tuple2<>(elementCounter, actualContent); } @Override public void restoreState(Tuple2<Long, Map<Integer, Set<String>>> state) throws Exception { + this.hasFailed = true; this.elementCounter = state.f0; - this.collectedContent = state.f1; + this.actualContent = state.f1; } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - numSuccessfulCheckpoints++; + hasSuccessfulCheckpoints = true; } + + private int getFileIdx(String line) { + String[] tkns = line.split(":"); + return Integer.parseInt(tkns[0]); + } + } + + // ------------------------- FILE CREATION ------------------------------- + + /** + * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds. + * It serves for testing the file monitoring functionality of the {@link ContinuousFileMonitoringFunction}. + * The files are filled with data by the {@link #fillWithData(String, String, int, String)} method. + * */ + private class FileCreator extends Thread { + + private final Set<Path> filesCreated = new HashSet<>(); + private final Map<Integer, String> fileContents = new HashMap<>(); + + /** The modification time of the last created file. */ + private long lastCreatedModTime = Long.MIN_VALUE; + + public void run() { + try { + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> tmpFile; + long modTime; + do { + + // give it some time so that the files have + // different modification timestamps. + Thread.sleep(50); + + tmpFile = fillWithData(localFsURI, "file", i, "This is test line."); + + modTime = localFs.getFileStatus(tmpFile.f0).getModificationTime(); + if (modTime <= lastCreatedModTime) { + // delete the last created file to recreate it with a different timestamp + localFs.delete(tmpFile.f0, false); + } + } while (modTime <= lastCreatedModTime); + lastCreatedModTime = modTime; + + // rename the file + org.apache.hadoop.fs.Path file = + new org.apache.hadoop.fs.Path(localFsURI + "/file" + i); + localFs.rename(tmpFile.f0, file); + Assert.assertTrue(localFs.exists(file)); + + filesCreated.add(file); + fileContents.put(i, tmpFile.f1); + } + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + + void clean() throws IOException { + assert (localFs != null); + for (org.apache.hadoop.fs.Path path: filesCreated) { + localFs.delete(path, false); + } + fileContents.clear(); + } + + Map<Integer, String> getFileContent() { + return this.fileContents; + } + } + + /** + * Fill the file with content and put the content in the {@code hdPathContents} list. + * */ + private Tuple2<Path, String> fillWithData( + String base, String fileName, int fileIdx, String sampleLine) throws IOException, InterruptedException { + + assert (localFs != null); + + org.apache.hadoop.fs.Path tmp = + new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx); + + FSDataOutputStream stream = localFs.create(tmp); + StringBuilder str = new StringBuilder(); + for(int i = 0; i < LINES_PER_FILE; i++) { + String line = fileIdx +": "+ sampleLine + " " + i +"\n"; + str.append(line); + stream.write(line.getBytes()); + } + stream.close(); + return new Tuple2<>(tmp, str.toString()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java new file mode 100644 index 0000000..88bd822 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.checkpointing; + +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; +import org.junit.Assert; +import org.junit.Test; + +public class TimestampedFileInputSplitTest { + + @Test + public void testSplitEquality() { + + TimestampedFileInputSplit eos1 = TimestampedFileInputSplit.EOS; + TimestampedFileInputSplit eos2 = TimestampedFileInputSplit.EOS; + + Assert.assertEquals(eos1, eos2); + + TimestampedFileInputSplit richFirstSplit = + new TimestampedFileInputSplit(10, 2, new Path("test"), 0, 100, null); + Assert.assertNotEquals(eos1, richFirstSplit); + + TimestampedFileInputSplit richSecondSplit = + new TimestampedFileInputSplit(10, 2, new Path("test"), 0, 100, null); + Assert.assertEquals(richFirstSplit, richSecondSplit); + + TimestampedFileInputSplit richModSecondSplit = + new TimestampedFileInputSplit(11, 2, new Path("test"), 0, 100, null); + Assert.assertNotEquals(richSecondSplit, richModSecondSplit); + + TimestampedFileInputSplit richThirdSplit = + new TimestampedFileInputSplit(10, 2, new Path("test/test1"), 0, 100, null); + Assert.assertEquals(richThirdSplit.getModificationTime(), 10); + Assert.assertNotEquals(richFirstSplit, richThirdSplit); + + TimestampedFileInputSplit richThirdSplitCopy = + new TimestampedFileInputSplit(10, 2, new Path("test/test1"), 0, 100, null); + Assert.assertEquals(richThirdSplitCopy, richThirdSplit); + } + + @Test + public void testSplitComparison() { + TimestampedFileInputSplit richFirstSplit = + new TimestampedFileInputSplit(10, 3, new Path("test/test1"), 0, 100, null); + + TimestampedFileInputSplit richSecondSplit = + new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit richThirdSplit = + new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit richForthSplit = + new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); + + // lexicographically on the path order + Assert.assertTrue(richFirstSplit.compareTo(richSecondSplit) < 0); + Assert.assertTrue(richFirstSplit.compareTo(richThirdSplit) < 0); + + // same mod time, same file so smaller split number first + Assert.assertTrue(richThirdSplit.compareTo(richSecondSplit) < 0); + + // smaller modification time first + Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0); + } + + @Test + public void testIllegalArgument() { + try { + new TimestampedFileInputSplit(-10, 2, new Path("test"), 0, 100, null); // invalid modification time + } catch (Exception e) { + if (!(e instanceof IllegalArgumentException)) { + Assert.fail(e.getMessage()); + } + } + } +}
