Repository: flink
Updated Branches:
  refs/heads/master ba2d007e5 -> 59a5551ef


[hotfix] Fixes the TimestampedInputSplit.EOS comparison.

This closes #2718


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59a5551e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59a5551e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59a5551e

Branch: refs/heads/master
Commit: 59a5551ef81a57007035526ab2c14ec07f13971b
Parents: ba2d007
Author: kl0u <[email protected]>
Authored: Fri Oct 28 14:10:25 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Mon Oct 31 19:07:18 2016 +0100

----------------------------------------------------------------------
 .../source/TimestampedFileInputSplit.java       | 11 ++-
 .../TimestampedFileInputSplitTest.java          | 81 +++++++++++++++++++-
 2 files changed, 87 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/59a5551e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
index 323b3ab..6a3ba0d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
@@ -46,7 +46,7 @@ public class TimestampedFileInputSplit extends FileInputSplit 
implements Compara
 
        /** A special {@link TimestampedFileInputSplit} signaling the end of 
the stream of splits.*/
        public static final TimestampedFileInputSplit EOS =
-               new TimestampedFileInputSplit(Long.MIN_VALUE, -1, null, -1, -1, 
null);
+               new TimestampedFileInputSplit(Long.MAX_VALUE, -1, null, -1, -1, 
null);
 
        /**
         * Creates a {@link TimestampedFileInputSplit} based on the file 
modification time and
@@ -108,6 +108,15 @@ public class TimestampedFileInputSplit extends 
FileInputSplit implements Compara
                        return modTimeComp > 0 ? 1 : -1;
                }
 
+               // the file input split allows for null paths
+               if (this.getPath() == o.getPath()) {
+                       return 0;
+               } else if (this.getPath() == null) {
+                       return 1;
+               } else if (o.getPath() == null) {
+                       return -1;
+               }
+
                int pathComp = this.getPath().compareTo(o.getPath());
                return pathComp != 0 ? pathComp : this.getSplitNumber() - 
o.getSplitNumber();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/59a5551e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
index 88bd822..9dc90d3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
@@ -17,12 +17,17 @@
  */
 package org.apache.flink.test.checkpointing;
 
-import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+
 public class TimestampedFileInputSplitTest {
 
        @Test
@@ -58,7 +63,7 @@ public class TimestampedFileInputSplitTest {
        @Test
        public void testSplitComparison() {
                TimestampedFileInputSplit richFirstSplit =
-                       new TimestampedFileInputSplit(10, 3, new 
Path("test/test1"), 0, 100, null);
+                       new TimestampedFileInputSplit(0, 3, new 
Path("test/test1"), 0, 100, null);
 
                TimestampedFileInputSplit richSecondSplit =
                        new TimestampedFileInputSplit(10, 2, new 
Path("test/test2"), 0, 100, null);
@@ -69,15 +74,32 @@ public class TimestampedFileInputSplitTest {
                TimestampedFileInputSplit richForthSplit =
                        new TimestampedFileInputSplit(11, 0, new 
Path("test/test3"), 0, 100, null);
 
-               // lexicographically on the path order
+               TimestampedFileInputSplit richFifthSplit =
+                       new TimestampedFileInputSplit(11, 1, new 
Path("test/test3"), 0, 100, null);
+
+               // smaller mod time
                Assert.assertTrue(richFirstSplit.compareTo(richSecondSplit) < 
0);
-               Assert.assertTrue(richFirstSplit.compareTo(richThirdSplit) < 0);
+
+               // lexicographically on the path
+               Assert.assertTrue(richThirdSplit.compareTo(richFifthSplit) < 0);
 
                // same mod time, same file so smaller split number first
                Assert.assertTrue(richThirdSplit.compareTo(richSecondSplit) < 
0);
 
                // smaller modification time first
                Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0);
+
+               
Assert.assertTrue(richFirstSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
+               
Assert.assertTrue(richSecondSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
+               
Assert.assertTrue(richThirdSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
+               
Assert.assertTrue(richForthSplit.compareTo(TimestampedFileInputSplit.EOS) < 0);
+
+               Assert.assertEquals(0, 
TimestampedFileInputSplit.EOS.compareTo(TimestampedFileInputSplit.EOS));
+
+               
Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richFirstSplit) > 0);
+               
Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richSecondSplit) > 0);
+               
Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richThirdSplit) > 0);
+               
Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richForthSplit) > 0);
        }
 
        @Test
@@ -90,4 +112,55 @@ public class TimestampedFileInputSplitTest {
                        }
                }
        }
+
+       @Test
+       public void testPriorityQ() {
+               TimestampedFileInputSplit richFirstSplit =
+                       new TimestampedFileInputSplit(0, 3, new 
Path("test/test1"), 0, 100, null);
+
+               TimestampedFileInputSplit richSecondSplit =
+                       new TimestampedFileInputSplit(10, 2, new 
Path("test/test2"), 0, 100, null);
+
+               TimestampedFileInputSplit richThirdSplit =
+                       new TimestampedFileInputSplit(10, 1, new 
Path("test/test2"), 0, 100, null);
+
+               TimestampedFileInputSplit richForthSplit =
+                       new TimestampedFileInputSplit(11, 0, new 
Path("test/test3"), 0, 100, null);
+
+               TimestampedFileInputSplit richFifthSplit =
+                       new TimestampedFileInputSplit(11, 1, new 
Path("test/test3"), 0, 100, null);
+
+               TimestampedFileInputSplit eos = TimestampedFileInputSplit.EOS;
+
+               Queue<TimestampedFileInputSplit> pendingSplits = new 
PriorityQueue<>();
+
+               pendingSplits.add(eos);
+               pendingSplits.add(richSecondSplit);
+               pendingSplits.add(richForthSplit);
+               pendingSplits.add(eos);
+               pendingSplits.add(richFirstSplit);
+               pendingSplits.add(richFifthSplit);
+               pendingSplits.add(richFifthSplit);
+               pendingSplits.add(richThirdSplit);
+
+               List<TimestampedFileInputSplit> actualSortedSplits = new 
ArrayList<>();
+               while (true) {
+                       actualSortedSplits.add(pendingSplits.poll());
+                       if (pendingSplits.isEmpty()) {
+                               break;
+                       }
+               }
+
+               List<TimestampedFileInputSplit> expectedSortedSplits = new 
ArrayList<>();
+               expectedSortedSplits.add(richFirstSplit);
+               expectedSortedSplits.add(richThirdSplit);
+               expectedSortedSplits.add(richSecondSplit);
+               expectedSortedSplits.add(richForthSplit);
+               expectedSortedSplits.add(richFifthSplit);
+               expectedSortedSplits.add(richFifthSplit);
+               expectedSortedSplits.add(eos);
+               expectedSortedSplits.add(eos);
+
+               Assert.assertArrayEquals(expectedSortedSplits.toArray(), 
actualSortedSplits.toArray());
+       }
 }

Reply via email to