Repository: flink Updated Branches: refs/heads/release-1.2 1037ace42 -> 3b4f6cf8c
[FLINK-5771] [core] Fix multi-char delimiter detection in DelimitedInputFormat. - Add a test case to validate correct delimiter detection. - Remove a couple of try-catch blocks from existing tests. This closes #3316. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b4f6cf8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b4f6cf8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b4f6cf8 Branch: refs/heads/release-1.2 Commit: 3b4f6cf8c8283c221c8ab58f544cfe01c092fe6b Parents: 1037ace Author: Fabian Hueske <[email protected]> Authored: Tue Feb 14 22:02:26 2017 +0100 Committer: Fabian Hueske <[email protected]> Committed: Wed Feb 15 23:58:34 2017 +0100 ---------------------------------------------------------------------- .../api/common/io/DelimitedInputFormat.java | 11 +- .../api/common/io/DelimitedInputFormatTest.java | 313 +++++++++---------- 2 files changed, 164 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3b4f6cf8/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index 5c8dfc1..a83d45f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -567,17 +567,24 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple int startPos = this.readPos; int count; + // Search for next occurence of delimiter in read buffer. while (this.readPos < this.limit && i < this.delimiter.length) { - if ((this.readBuffer[this.readPos++]) == this.delimiter[i]) { + if ((this.readBuffer[this.readPos]) == this.delimiter[i]) { + // Found the expected delimiter character. Continue looking for the next character of delimiter. i++; } else { + // Delimiter does not match. + // We have to reset the read position to the character after the first matching character + // and search for the whole delimiter again. + readPos -= i; i = 0; } + readPos++; } // check why we dropped out if (i == this.delimiter.length) { - // line end + // delimiter found int totalBytesRead = this.readPos - startPos; this.offset += countInWrapBuffer + totalBytesRead; count = totalBytesRead - this.delimiter.length; http://git-wip-us.apache.org/repos/asf/flink/blob/3b4f6cf8/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java index 219365a..7ce0a2e 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java @@ -173,32 +173,53 @@ public class DelimitedInputFormatTest { } @Test - public void testReadCustomDelimiter() { - try { - final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2"; - final FileInputSplit split = createTempFile(myString); - - final Configuration parameters = new Configuration(); - - format.setDelimiter("$$$"); - format.configure(parameters); - format.open(split); - - String first = format.nextRecord(null); - assertNotNull(first); - assertEquals("my key|my val", first); - - String second = format.nextRecord(null); - assertNotNull(second); - assertEquals("my key2\n$$ctd.$$|my value2", second); - - assertNull(format.nextRecord(null)); - assertTrue(format.reachedEnd()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public void testReadCustomDelimiter() throws IOException { + final String myString = "my key|my val$$$my key2\n$$ctd.$$|my value2"; + final FileInputSplit split = createTempFile(myString); + + final Configuration parameters = new Configuration(); + + format.setDelimiter("$$$"); + format.configure(parameters); + format.open(split); + + String first = format.nextRecord(null); + assertNotNull(first); + assertEquals("my key|my val", first); + + String second = format.nextRecord(null); + assertNotNull(second); + assertEquals("my key2\n$$ctd.$$|my value2", second); + + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + } + + @Test + public void testMultiCharDelimiter() throws IOException { + final String myString = "www112xx1123yyy11123zzzzz1123"; + final FileInputSplit split = createTempFile(myString); + + final Configuration parameters = new Configuration(); + + format.setDelimiter("1123"); + format.configure(parameters); + format.open(split); + + String first = format.nextRecord(null); + assertNotNull(first); + assertEquals("www112xx", first); + + String second = format.nextRecord(null); + assertNotNull(second); + assertEquals("yyy1", second); + + String third = format.nextRecord(null); + assertNotNull(third); + assertEquals("zzzzz", third); + + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); } @Test @@ -244,164 +265,140 @@ public class DelimitedInputFormatTest { * Tests that the records are read correctly when the split boundary is in the middle of a record. */ @Test - public void testReadOverSplitBoundariesUnaligned() { - try { - final String myString = "value1\nvalue2\nvalue3"; - final FileInputSplit split = createTempFile(myString); - - FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames()); - FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames()); - - final Configuration parameters = new Configuration(); - - format.configure(parameters); - format.open(split1); - - assertEquals("value1", format.nextRecord(null)); - assertEquals("value2", format.nextRecord(null)); - assertNull(format.nextRecord(null)); - assertTrue(format.reachedEnd()); - - format.close(); - format.open(split2); + public void testReadOverSplitBoundariesUnaligned() throws IOException { + final String myString = "value1\nvalue2\nvalue3"; + final FileInputSplit split = createTempFile(myString); - assertEquals("value3", format.nextRecord(null)); - assertNull(format.nextRecord(null)); - assertTrue(format.reachedEnd()); - - format.close(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames()); + FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames()); + + final Configuration parameters = new Configuration(); + + format.configure(parameters); + format.open(split1); + + assertEquals("value1", format.nextRecord(null)); + assertEquals("value2", format.nextRecord(null)); + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + + format.close(); + format.open(split2); + + assertEquals("value3", format.nextRecord(null)); + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + + format.close(); } /** * Tests that the correct number of records is read when the split boundary is exact at the record boundary. */ @Test - public void testReadWithBufferSizeIsMultple() { - try { - final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n"; - final FileInputSplit split = createTempFile(myString); - - FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames()); - FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames()); + public void testReadWithBufferSizeIsMultiple() throws IOException { + final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n"; + final FileInputSplit split = createTempFile(myString); - final Configuration parameters = new Configuration(); + FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames()); + FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames()); - format.setBufferSize(2 * ((int) split1.getLength())); - format.configure(parameters); + final Configuration parameters = new Configuration(); - String next; - int count = 0; + format.setBufferSize(2 * ((int) split1.getLength())); + format.configure(parameters); - // read split 1 - format.open(split1); - while ((next = format.nextRecord(null)) != null) { - assertEquals(7, next.length()); - count++; - } - assertNull(format.nextRecord(null)); - assertTrue(format.reachedEnd()); - format.close(); - - // this one must have read one too many, because the next split will skipp the trailing remainder - // which happens to be one full record - assertEquals(3, count); - - // read split 2 - format.open(split2); - while ((next = format.nextRecord(null)) != null) { - assertEquals(7, next.length()); - count++; - } - format.close(); + String next; + int count = 0; - assertEquals(4, count); + // read split 1 + format.open(split1); + while ((next = format.nextRecord(null)) != null) { + assertEquals(7, next.length()); + count++; } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + format.close(); + + // this one must have read one too many, because the next split will skipp the trailing remainder + // which happens to be one full record + assertEquals(3, count); + + // read split 2 + format.open(split2); + while ((next = format.nextRecord(null)) != null) { + assertEquals(7, next.length()); + count++; } + format.close(); + + assertEquals(4, count); } @Test - public void testReadExactlyBufferSize() { - try { - final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n"; - - final FileInputSplit split = createTempFile(myString); - final Configuration parameters = new Configuration(); - - format.setBufferSize((int) split.getLength()); - format.configure(parameters); - format.open(split); + public void testReadExactlyBufferSize() throws IOException { + final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n"; - String next; - int count = 0; - while ((next = format.nextRecord(null)) != null) { - assertEquals(7, next.length()); - count++; - } - assertNull(format.nextRecord(null)); - assertTrue(format.reachedEnd()); + final FileInputSplit split = createTempFile(myString); + final Configuration parameters = new Configuration(); - format.close(); - - assertEquals(4, count); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + format.setBufferSize((int) split.getLength()); + format.configure(parameters); + format.open(split); + + String next; + int count = 0; + while ((next = format.nextRecord(null)) != null) { + assertEquals(7, next.length()); + count++; } + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + + format.close(); + + assertEquals(4, count); } @Test - public void testReadRecordsLargerThanBuffer() { - try { - final String myString = "aaaaaaaaaaaaaaaaaaaaa\n" + - "bbbbbbbbbbbbbbbbbbbbbbbbb\n" + - "ccccccccccccccccccc\n" + - "ddddddddddddddddddddddddddddddddddd\n"; - - final FileInputSplit split = createTempFile(myString); - FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames()); - FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames()); - - final Configuration parameters = new Configuration(); - - format.setBufferSize(8); - format.configure(parameters); - - String next; - List<String> result = new ArrayList<String>(); - - - format.open(split1); - while ((next = format.nextRecord(null)) != null) { - result.add(next); - } - assertNull(format.nextRecord(null)); - assertTrue(format.reachedEnd()); - format.close(); + public void testReadRecordsLargerThanBuffer() throws IOException { + final String myString = "aaaaaaaaaaaaaaaaaaaaa\n" + + "bbbbbbbbbbbbbbbbbbbbbbbbb\n" + + "ccccccccccccccccccc\n" + + "ddddddddddddddddddddddddddddddddddd\n"; - format.open(split2); - while ((next = format.nextRecord(null)) != null) { - result.add(next); - } - assertNull(format.nextRecord(null)); - assertTrue(format.reachedEnd()); - format.close(); - - assertEquals(4, result.size()); - assertEquals(Arrays.asList(myString.split("\n")), result); + final FileInputSplit split = createTempFile(myString); + FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 0, split.getLength() / 2, split.getHostnames()); + FileInputSplit split2 = new FileInputSplit(1, split.getPath(), split1.getLength(), split.getLength(), split.getHostnames()); + + final Configuration parameters = new Configuration(); + + format.setBufferSize(8); + format.configure(parameters); + + String next; + List<String> result = new ArrayList<String>(); + + + format.open(split1); + while ((next = format.nextRecord(null)) != null) { + result.add(next); } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + format.close(); + + format.open(split2); + while ((next = format.nextRecord(null)) != null) { + result.add(next); } + assertNull(format.nextRecord(null)); + assertTrue(format.reachedEnd()); + format.close(); + + assertEquals(4, result.size()); + assertEquals(Arrays.asList(myString.split("\n")), result); } static FileInputSplit createTempFile(String contents) throws IOException {
