[FLINK-5021] Remove the special EOS TimestampedFileInputSplit. Without this special split signaling that no more splits are to arrive, the ContinuousFileReaderOperator now closes by setting a flag that marks it as closed and exiting when the flag is set to true and the pending split queue is empty.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98a61762 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98a61762 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98a61762 Branch: refs/heads/master Commit: 98a6176280dd7b85dcd6fbacd324eb1056e23419 Parents: 9918839 Author: kl0u <[email protected]> Authored: Thu Nov 3 10:50:04 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Nov 11 14:05:58 2016 +0100 ---------------------------------------------------------------------- .../source/ContinuousFileReaderOperator.java | 53 ++++++++++---------- .../source/TimestampedFileInputSplit.java | 25 ++++----- .../TimestampedFileInputSplitTest.java | 25 --------- 3 files changed, 37 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/98a61762/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 19e4737..db8e8fd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -42,12 +42,10 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; import java.util.Queue; -import static org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit.EOS; import static org.apache.flink.util.Preconditions.checkState; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -162,22 +160,18 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU public void close() throws Exception { super.close(); - // signal that no more splits will come, wait for the reader to finish - // and close the collector. Further cleaning up is handled by the dispose(). + // close the reader to signal that no more splits will come. By doing this, + // the reader will exit as soon as it finishes processing the already pending splits. + // This method will wait until then. Further cleaning up is handled by the dispose(). if (reader != null && reader.isAlive() && reader.isRunning()) { - // add a dummy element to signal that no more splits will - // arrive and wait until the reader finishes - reader.addSplit(EOS); - - // we already have the checkpoint lock because close() is - // called by the StreamTask while having it. + reader.close(); checkpointLock.wait(); } - // finally if we are closed normally and we are operating on - // event or ingestion time, emit the max watermark indicating - // the end of the stream, like a normal source would do. + // finally if we are operating on event or ingestion time, + // emit the long-max watermark indicating the end of the stream, + // like a normal source would do. if (readerContext != null) { readerContext.emitWatermark(Watermark.MAX_WATERMARK); @@ -188,6 +182,8 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU private class SplitReader<OT> extends Thread { + private volatile boolean isClosed; + private volatile boolean isRunning; private final FileInputFormat<OT> format; @@ -213,14 +209,10 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context."); this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock."); + this.isClosed = false; this.isRunning = true; - this.pendingSplits = new PriorityQueue<>(10, new Comparator<TimestampedFileInputSplit>() { - @Override - public int compare(TimestampedFileInputSplit o1, TimestampedFileInputSplit o2) { - return o1.compareTo(o2); - } - }); + this.pendingSplits = new PriorityQueue<>(); // this is the case where a task recovers from a previous failed attempt if (restoredState != null) { @@ -252,17 +244,21 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU if (currentSplit == null) { currentSplit = this.pendingSplits.poll(); + + // if the list of pending splits is empty (currentSplit == null) then: + // 1) if close() was called on the operator then exit the while loop + // 2) if not wait 50 ms and try again to fetch a new split to read + if (currentSplit == null) { - checkpointLock.wait(50); + if (!this.isClosed) { + checkpointLock.wait(50); + } else { + isRunning = false; + } continue; } } - if (currentSplit.equals(EOS)) { - isRunning = false; - break; - } - if (this.format instanceof CheckpointableInputFormat && currentSplit.getSplitState() != null) { // recovering after a node failure with an input // format that supports resetting the offset @@ -332,7 +328,8 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU List<TimestampedFileInputSplit> snapshot = new ArrayList<>(this.pendingSplits.size()); if (currentSplit != null ) { if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) { - Serializable formatState = ((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) this.format).getCurrentState(); + Serializable formatState = + ((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) this.format).getCurrentState(); this.currentSplit.setSplitState(formatState); } snapshot.add(this.currentSplit); @@ -344,6 +341,10 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU public void cancel() { this.isRunning = false; } + + public void close() { + this.isClosed = true; + } } // --------------------- Checkpointing -------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/98a61762/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java index 6a3ba0d..2a0be98 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java @@ -44,10 +44,6 @@ public class TimestampedFileInputSplit extends FileInputSplit implements Compara * */ private Serializable splitState; - /** A special {@link TimestampedFileInputSplit} signaling the end of the stream of splits.*/ - public static final TimestampedFileInputSplit EOS = - new TimestampedFileInputSplit(Long.MAX_VALUE, -1, null, -1, -1, null); - /** * Creates a {@link TimestampedFileInputSplit} based on the file modification time and * the rest of the information of the {@link FileInputSplit}, as returned by the @@ -101,24 +97,23 @@ public class TimestampedFileInputSplit extends FileInputSplit implements Compara @Override public int compareTo(TimestampedFileInputSplit o) { - long modTimeComp = this.modificationTime - o.modificationTime; + int modTimeComp = Long.compare(this.modificationTime, o.modificationTime); if (modTimeComp != 0L) { - // we cannot just cast the modTimeComp to int - // because it may overflow - return modTimeComp > 0 ? 1 : -1; + return modTimeComp; } - // the file input split allows for null paths - if (this.getPath() == o.getPath()) { - return 0; - } else if (this.getPath() == null) { + // the file input split does not prevent null paths. + if (this.getPath() == null && o.getPath() != null) { return 1; - } else if (o.getPath() == null) { + } else if (this.getPath() != null && o.getPath() == null) { return -1; } - int pathComp = this.getPath().compareTo(o.getPath()); - return pathComp != 0 ? pathComp : this.getSplitNumber() - o.getSplitNumber(); + int pathComp = this.getPath() == o.getPath() ? 0 : + this.getPath().compareTo(o.getPath()); + + return pathComp != 0 ? pathComp : + this.getSplitNumber() - o.getSplitNumber(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/98a61762/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java index 9dc90d3..0a89ab9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java @@ -23,7 +23,6 @@ import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; import java.util.Queue; @@ -33,14 +32,8 @@ public class TimestampedFileInputSplitTest { @Test public void testSplitEquality() { - TimestampedFileInputSplit eos1 = TimestampedFileInputSplit.EOS; - TimestampedFileInputSplit eos2 = TimestampedFileInputSplit.EOS; - - Assert.assertEquals(eos1, eos2); - TimestampedFileInputSplit richFirstSplit = new TimestampedFileInputSplit(10, 2, new Path("test"), 0, 100, null); - Assert.assertNotEquals(eos1, richFirstSplit); TimestampedFileInputSplit richSecondSplit = new TimestampedFileInputSplit(10, 2, new Path("test"), 0, 100, null); @@ -88,18 +81,6 @@ public class TimestampedFileInputSplitTest { // smaller modification time first Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0); - - Assert.assertTrue(richFirstSplit.compareTo(TimestampedFileInputSplit.EOS) < 0); - Assert.assertTrue(richSecondSplit.compareTo(TimestampedFileInputSplit.EOS) < 0); - Assert.assertTrue(richThirdSplit.compareTo(TimestampedFileInputSplit.EOS) < 0); - Assert.assertTrue(richForthSplit.compareTo(TimestampedFileInputSplit.EOS) < 0); - - Assert.assertEquals(0, TimestampedFileInputSplit.EOS.compareTo(TimestampedFileInputSplit.EOS)); - - Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richFirstSplit) > 0); - Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richSecondSplit) > 0); - Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richThirdSplit) > 0); - Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richForthSplit) > 0); } @Test @@ -130,14 +111,10 @@ public class TimestampedFileInputSplitTest { TimestampedFileInputSplit richFifthSplit = new TimestampedFileInputSplit(11, 1, new Path("test/test3"), 0, 100, null); - TimestampedFileInputSplit eos = TimestampedFileInputSplit.EOS; - Queue<TimestampedFileInputSplit> pendingSplits = new PriorityQueue<>(); - pendingSplits.add(eos); pendingSplits.add(richSecondSplit); pendingSplits.add(richForthSplit); - pendingSplits.add(eos); pendingSplits.add(richFirstSplit); pendingSplits.add(richFifthSplit); pendingSplits.add(richFifthSplit); @@ -158,8 +135,6 @@ public class TimestampedFileInputSplitTest { expectedSortedSplits.add(richForthSplit); expectedSortedSplits.add(richFifthSplit); expectedSortedSplits.add(richFifthSplit); - expectedSortedSplits.add(eos); - expectedSortedSplits.add(eos); Assert.assertArrayEquals(expectedSortedSplits.toArray(), actualSortedSplits.toArray()); }
