[FLINK-5021] Remove the special EOS TimestampedFileInputSplit.

Without this special split signaling that no more splits are
to arrive, the ContinuousFileReaderOperator now closes by
setting a flag that marks it as closed and exiting when the
flag is set to true and the pending split queue is empty.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98a61762
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98a61762
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98a61762

Branch: refs/heads/master
Commit: 98a6176280dd7b85dcd6fbacd324eb1056e23419
Parents: 9918839
Author: kl0u <[email protected]>
Authored: Thu Nov 3 10:50:04 2016 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Nov 11 14:05:58 2016 +0100

----------------------------------------------------------------------
 .../source/ContinuousFileReaderOperator.java    | 53 ++++++++++----------
 .../source/TimestampedFileInputSplit.java       | 25 ++++-----
 .../TimestampedFileInputSplitTest.java          | 25 ---------
 3 files changed, 37 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/98a61762/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 19e4737..db8e8fd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -42,12 +42,10 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
 
-import static 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit.EOS;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -162,22 +160,18 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
        public void close() throws Exception {
                super.close();
 
-               // signal that no more splits will come, wait for the reader to 
finish
-               // and close the collector. Further cleaning up is handled by 
the dispose().
+               // close the reader to signal that no more splits will come. By 
doing this,
+               // the reader will exit as soon as it finishes processing the 
already pending splits.
+               // This method will wait until then. Further cleaning up is 
handled by the dispose().
 
                if (reader != null && reader.isAlive() && reader.isRunning()) {
-                       // add a dummy element to signal that no more splits 
will
-                       // arrive and wait until the reader finishes
-                       reader.addSplit(EOS);
-
-                       // we already have the checkpoint lock because close() 
is
-                       // called by the StreamTask while having it.
+                       reader.close();
                        checkpointLock.wait();
                }
 
-               // finally if we are closed normally and we are operating on
-               // event or ingestion time, emit the max watermark indicating
-               // the end of the stream, like a normal source would do.
+               // finally if we are operating on event or ingestion time,
+               // emit the long-max watermark indicating the end of the stream,
+               // like a normal source would do.
 
                if (readerContext != null) {
                        readerContext.emitWatermark(Watermark.MAX_WATERMARK);
@@ -188,6 +182,8 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
 
        private class SplitReader<OT> extends Thread {
 
+               private volatile boolean isClosed;
+
                private volatile boolean isRunning;
 
                private final FileInputFormat<OT> format;
@@ -213,14 +209,10 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
                        this.readerContext = checkNotNull(readerContext, 
"Unspecified Reader Context.");
                        this.checkpointLock = checkNotNull(checkpointLock, 
"Unspecified checkpoint lock.");
 
+                       this.isClosed = false;
                        this.isRunning = true;
 
-                       this.pendingSplits = new PriorityQueue<>(10, new 
Comparator<TimestampedFileInputSplit>() {
-                               @Override
-                               public int compare(TimestampedFileInputSplit 
o1, TimestampedFileInputSplit o2) {
-                                       return o1.compareTo(o2);
-                               }
-                       });
+                       this.pendingSplits = new PriorityQueue<>();
 
                        // this is the case where a task recovers from a 
previous failed attempt
                        if (restoredState != null) {
@@ -252,17 +244,21 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
 
                                                if (currentSplit == null) {
                                                        currentSplit = 
this.pendingSplits.poll();
+
+                                                       // if the list of 
pending splits is empty (currentSplit == null) then:
+                                                       //   1) if close() was 
called on the operator then exit the while loop
+                                                       //   2) if not wait 50 
ms and try again to fetch a new split to read
+
                                                        if (currentSplit == 
null) {
-                                                               
checkpointLock.wait(50);
+                                                               if 
(!this.isClosed) {
+                                                                       
checkpointLock.wait(50);
+                                                               } else {
+                                                                       
isRunning = false;
+                                                               }
                                                                continue;
                                                        }
                                                }
 
-                                               if (currentSplit.equals(EOS)) {
-                                                       isRunning = false;
-                                                       break;
-                                               }
-
                                                if (this.format instanceof 
CheckpointableInputFormat && currentSplit.getSplitState() != null) {
                                                        // recovering after a 
node failure with an input
                                                        // format that supports 
resetting the offset
@@ -332,7 +328,8 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
                        List<TimestampedFileInputSplit> snapshot = new 
ArrayList<>(this.pendingSplits.size());
                        if (currentSplit != null ) {
                                if (this.format instanceof 
CheckpointableInputFormat && this.isSplitOpen) {
-                                       Serializable formatState = 
((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) 
this.format).getCurrentState();
+                                       Serializable formatState =
+                                               
((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) 
this.format).getCurrentState();
                                        
this.currentSplit.setSplitState(formatState);
                                }
                                snapshot.add(this.currentSplit);
@@ -344,6 +341,10 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
                public void cancel() {
                        this.isRunning = false;
                }
+
+               public void close() {
+                       this.isClosed = true;
+               }
        }
 
        //      ---------------------                   Checkpointing           
        --------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/98a61762/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
index 6a3ba0d..2a0be98 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
@@ -44,10 +44,6 @@ public class TimestampedFileInputSplit extends 
FileInputSplit implements Compara
         * */
        private Serializable splitState;
 
-       /** A special {@link TimestampedFileInputSplit} signaling the end of 
the stream of splits.*/
-       public static final TimestampedFileInputSplit EOS =
-               new TimestampedFileInputSplit(Long.MAX_VALUE, -1, null, -1, -1, 
null);
-
        /**
         * Creates a {@link TimestampedFileInputSplit} based on the file 
modification time and
         * the rest of the information of the {@link FileInputSplit}, as 
returned by the
@@ -101,24 +97,23 @@ public class TimestampedFileInputSplit extends 
FileInputSplit implements Compara
 
        @Override
        public int compareTo(TimestampedFileInputSplit o) {
-               long modTimeComp = this.modificationTime - o.modificationTime;
+               int modTimeComp = Long.compare(this.modificationTime, 
o.modificationTime);
                if (modTimeComp != 0L) {
-                       // we cannot just cast the modTimeComp to int
-                       // because it may overflow
-                       return modTimeComp > 0 ? 1 : -1;
+                       return modTimeComp;
                }
 
-               // the file input split allows for null paths
-               if (this.getPath() == o.getPath()) {
-                       return 0;
-               } else if (this.getPath() == null) {
+               // the file input split does not prevent null paths.
+               if (this.getPath() == null && o.getPath() != null) {
                        return 1;
-               } else if (o.getPath() == null) {
+               } else if (this.getPath() != null && o.getPath() == null) {
                        return -1;
                }
 
-               int pathComp = this.getPath().compareTo(o.getPath());
-               return pathComp != 0 ? pathComp : this.getSplitNumber() - 
o.getSplitNumber();
+               int pathComp = this.getPath() == o.getPath() ? 0 :
+                       this.getPath().compareTo(o.getPath());
+
+               return pathComp != 0 ? pathComp :
+                       this.getSplitNumber() - o.getSplitNumber();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/98a61762/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
index 9dc90d3..0a89ab9 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
@@ -23,7 +23,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
@@ -33,14 +32,8 @@ public class TimestampedFileInputSplitTest {
        @Test
        public void testSplitEquality() {
 
-               TimestampedFileInputSplit eos1 = TimestampedFileInputSplit.EOS;
-               TimestampedFileInputSplit eos2 = TimestampedFileInputSplit.EOS;
-
-               Assert.assertEquals(eos1, eos2);
-
                TimestampedFileInputSplit richFirstSplit =
                        new TimestampedFileInputSplit(10, 2, new Path("test"), 
0, 100, null);
-               Assert.assertNotEquals(eos1, richFirstSplit);
 
                TimestampedFileInputSplit richSecondSplit =
                        new TimestampedFileInputSplit(10, 2, new Path("test"), 
0, 100, null);
@@ -88,18 +81,6 @@ public class TimestampedFileInputSplitTest {
 
                // smaller modification time first
                Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0);
-
-               
Assert.assertTrue(richFirstSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
-               
Assert.assertTrue(richSecondSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
-               
Assert.assertTrue(richThirdSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
-               
Assert.assertTrue(richForthSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
-
-               Assert.assertEquals(0, 
TimestampedFileInputSplit.EOS.compareTo(TimestampedFileInputSplit.EOS));
-
-               
Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richFirstSplit) > 0);
-               
Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richSecondSplit) > 0);
-               
Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richThirdSplit) > 0);
-               
Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richForthSplit) > 0);
        }
 
        @Test
@@ -130,14 +111,10 @@ public class TimestampedFileInputSplitTest {
                TimestampedFileInputSplit richFifthSplit =
                        new TimestampedFileInputSplit(11, 1, new 
Path("test/test3"), 0, 100, null);
 
-               TimestampedFileInputSplit eos = TimestampedFileInputSplit.EOS;
-
                Queue<TimestampedFileInputSplit> pendingSplits = new 
PriorityQueue<>();
 
-               pendingSplits.add(eos);
                pendingSplits.add(richSecondSplit);
                pendingSplits.add(richForthSplit);
-               pendingSplits.add(eos);
                pendingSplits.add(richFirstSplit);
                pendingSplits.add(richFifthSplit);
                pendingSplits.add(richFifthSplit);
@@ -158,8 +135,6 @@ public class TimestampedFileInputSplitTest {
                expectedSortedSplits.add(richForthSplit);
                expectedSortedSplits.add(richFifthSplit);
                expectedSortedSplits.add(richFifthSplit);
-               expectedSortedSplits.add(eos);
-               expectedSortedSplits.add(eos);
 
                Assert.assertArrayEquals(expectedSortedSplits.toArray(), 
actualSortedSplits.toArray());
        }

Reply via email to