Repository: flink Updated Branches: refs/heads/master ba2d007e5 -> 59a5551ef
[hotfix] Fixes the TimestampedInputSplit.EOS comparison. This closes #2718 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59a5551e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59a5551e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59a5551e Branch: refs/heads/master Commit: 59a5551ef81a57007035526ab2c14ec07f13971b Parents: ba2d007 Author: kl0u <[email protected]> Authored: Fri Oct 28 14:10:25 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Mon Oct 31 19:07:18 2016 +0100 ---------------------------------------------------------------------- .../source/TimestampedFileInputSplit.java | 11 ++- .../TimestampedFileInputSplitTest.java | 81 +++++++++++++++++++- 2 files changed, 87 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/59a5551e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java index 323b3ab..6a3ba0d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java @@ -46,7 +46,7 @@ public class TimestampedFileInputSplit extends FileInputSplit implements Compara /** A special {@link TimestampedFileInputSplit} signaling the end of the stream of splits.*/ public static final TimestampedFileInputSplit EOS = - new TimestampedFileInputSplit(Long.MIN_VALUE, -1, null, -1, -1, null); + new TimestampedFileInputSplit(Long.MAX_VALUE, -1, null, -1, -1, null); /** * Creates a {@link TimestampedFileInputSplit} based on the file modification time and @@ -108,6 +108,15 @@ public class TimestampedFileInputSplit extends FileInputSplit implements Compara return modTimeComp > 0 ? 1 : -1; } + // the file input split allows for null paths + if (this.getPath() == o.getPath()) { + return 0; + } else if (this.getPath() == null) { + return 1; + } else if (o.getPath() == null) { + return -1; + } + int pathComp = this.getPath().compareTo(o.getPath()); return pathComp != 0 ? pathComp : this.getSplitNumber() - o.getSplitNumber(); } http://git-wip-us.apache.org/repos/asf/flink/blob/59a5551e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java index 88bd822..9dc90d3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java @@ -17,12 +17,17 @@ */ package org.apache.flink.test.checkpointing; -import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Queue; + public class TimestampedFileInputSplitTest { @Test @@ -58,7 +63,7 @@ public class TimestampedFileInputSplitTest { @Test public void testSplitComparison() { TimestampedFileInputSplit richFirstSplit = - new TimestampedFileInputSplit(10, 3, new Path("test/test1"), 0, 100, null); + new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); TimestampedFileInputSplit richSecondSplit = new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 0, 100, null); @@ -69,15 +74,32 @@ public class TimestampedFileInputSplitTest { TimestampedFileInputSplit richForthSplit = new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); - // lexicographically on the path order + TimestampedFileInputSplit richFifthSplit = + new TimestampedFileInputSplit(11, 1, new Path("test/test3"), 0, 100, null); + + // smaller mod time Assert.assertTrue(richFirstSplit.compareTo(richSecondSplit) < 0); - Assert.assertTrue(richFirstSplit.compareTo(richThirdSplit) < 0); + + // lexicographically on the path + Assert.assertTrue(richThirdSplit.compareTo(richFifthSplit) < 0); // same mod time, same file so smaller split number first Assert.assertTrue(richThirdSplit.compareTo(richSecondSplit) < 0); // smaller modification time first Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0); + + Assert.assertTrue(richFirstSplit.compareTo(TimestampedFileInputSplit.EOS) < 0); + Assert.assertTrue(richSecondSplit.compareTo(TimestampedFileInputSplit.EOS) < 0); + Assert.assertTrue(richThirdSplit.compareTo(TimestampedFileInputSplit.EOS) < 0); + Assert.assertTrue(richForthSplit.compareTo(TimestampedFileInputSplit.EOS) < 0); + + Assert.assertEquals(0, TimestampedFileInputSplit.EOS.compareTo(TimestampedFileInputSplit.EOS)); + + Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richFirstSplit) > 0); + Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richSecondSplit) > 0); + Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richThirdSplit) > 0); + Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richForthSplit) > 0); } @Test @@ -90,4 +112,55 @@ public class TimestampedFileInputSplitTest { } } } + + @Test + public void testPriorityQ() { + TimestampedFileInputSplit richFirstSplit = + new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); + + TimestampedFileInputSplit richSecondSplit = + new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit richThirdSplit = + new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit richForthSplit = + new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); + + TimestampedFileInputSplit richFifthSplit = + new TimestampedFileInputSplit(11, 1, new Path("test/test3"), 0, 100, null); + + TimestampedFileInputSplit eos = TimestampedFileInputSplit.EOS; + + Queue<TimestampedFileInputSplit> pendingSplits = new PriorityQueue<>(); + + pendingSplits.add(eos); + pendingSplits.add(richSecondSplit); + pendingSplits.add(richForthSplit); + pendingSplits.add(eos); + pendingSplits.add(richFirstSplit); + pendingSplits.add(richFifthSplit); + pendingSplits.add(richFifthSplit); + pendingSplits.add(richThirdSplit); + + List<TimestampedFileInputSplit> actualSortedSplits = new ArrayList<>(); + while (true) { + actualSortedSplits.add(pendingSplits.poll()); + if (pendingSplits.isEmpty()) { + break; + } + } + + List<TimestampedFileInputSplit> expectedSortedSplits = new ArrayList<>(); + expectedSortedSplits.add(richFirstSplit); + expectedSortedSplits.add(richThirdSplit); + expectedSortedSplits.add(richSecondSplit); + expectedSortedSplits.add(richForthSplit); + expectedSortedSplits.add(richFifthSplit); + expectedSortedSplits.add(richFifthSplit); + expectedSortedSplits.add(eos); + expectedSortedSplits.add(eos); + + Assert.assertArrayEquals(expectedSortedSplits.toArray(), actualSortedSplits.toArray()); + } }
