Repository: flink
Updated Branches:
  refs/heads/master 5fb267de6 -> ae0fbff76


[FLINK-5771] [core] Fix multi-char delimiter detection in DelimitedInputFormat.

- Add a test case to validate correct delimiter detection.
- Remove a couple of try-catch blocks from existing tests.

This closes #3316.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d6a97e48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d6a97e48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d6a97e48

Branch: refs/heads/master
Commit: d6a97e480e294e4779eb320a8b57983122a6cf63
Parents: 5fb267d
Author: Fabian Hueske <[email protected]>
Authored: Tue Feb 14 22:02:26 2017 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Wed Feb 15 23:53:09 2017 +0100

----------------------------------------------------------------------
 .../api/common/io/DelimitedInputFormat.java     |  11 +-
 .../api/common/io/DelimitedInputFormatTest.java | 313 +++++++++----------
 2 files changed, 164 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d6a97e48/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 5c8dfc1..a83d45f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -567,17 +567,24 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> imple
                        int startPos = this.readPos;
                        int count;
 
+                       // Search for next occurence of delimiter in read 
buffer.
                        while (this.readPos < this.limit && i < 
this.delimiter.length) {
-                               if ((this.readBuffer[this.readPos++]) == 
this.delimiter[i]) {
+                               if ((this.readBuffer[this.readPos]) == 
this.delimiter[i]) {
+                                       // Found the expected delimiter 
character. Continue looking for the next character of delimiter.
                                        i++;
                                } else {
+                                       // Delimiter does not match.
+                                       // We have to reset the read position 
to the character after the first matching character
+                                       //   and search for the whole delimiter 
again.
+                                       readPos -= i;
                                        i = 0;
                                }
+                               readPos++;
                        }
 
                        // check why we dropped out
                        if (i == this.delimiter.length) {
-                               // line end
+                               // delimiter found
                                int totalBytesRead = this.readPos - startPos;
                                this.offset += countInWrapBuffer + 
totalBytesRead;
                                count = totalBytesRead - this.delimiter.length;

http://git-wip-us.apache.org/repos/asf/flink/blob/d6a97e48/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 219365a..7ce0a2e 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -173,32 +173,53 @@ public class DelimitedInputFormatTest {
        }
        
        @Test
-       public void testReadCustomDelimiter() {
-               try {
-                       final String myString = "my key|my val$$$my 
key2\n$$ctd.$$|my value2";
-                       final FileInputSplit split = createTempFile(myString);
-                       
-                       final Configuration parameters = new Configuration();
-                       
-                       format.setDelimiter("$$$");
-                       format.configure(parameters);
-                       format.open(split);
-       
-                       String first = format.nextRecord(null);
-                       assertNotNull(first);
-                       assertEquals("my key|my val", first);
-
-                       String second = format.nextRecord(null);
-                       assertNotNull(second);
-                       assertEquals("my key2\n$$ctd.$$|my value2", second);
-                       
-                       assertNull(format.nextRecord(null));
-                       assertTrue(format.reachedEnd());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+       public void testReadCustomDelimiter() throws IOException {
+               final String myString = "my key|my val$$$my key2\n$$ctd.$$|my 
value2";
+               final FileInputSplit split = createTempFile(myString);
+
+               final Configuration parameters = new Configuration();
+
+               format.setDelimiter("$$$");
+               format.configure(parameters);
+               format.open(split);
+
+               String first = format.nextRecord(null);
+               assertNotNull(first);
+               assertEquals("my key|my val", first);
+
+               String second = format.nextRecord(null);
+               assertNotNull(second);
+               assertEquals("my key2\n$$ctd.$$|my value2", second);
+
+               assertNull(format.nextRecord(null));
+               assertTrue(format.reachedEnd());
+       }
+
+       @Test
+       public void testMultiCharDelimiter() throws IOException {
+               final String myString = "www112xx1123yyy11123zzzzz1123";
+               final FileInputSplit split = createTempFile(myString);
+
+               final Configuration parameters = new Configuration();
+
+               format.setDelimiter("1123");
+               format.configure(parameters);
+               format.open(split);
+
+               String first = format.nextRecord(null);
+               assertNotNull(first);
+               assertEquals("www112xx", first);
+
+               String second = format.nextRecord(null);
+               assertNotNull(second);
+               assertEquals("yyy1", second);
+
+               String third = format.nextRecord(null);
+               assertNotNull(third);
+               assertEquals("zzzzz", third);
+
+               assertNull(format.nextRecord(null));
+               assertTrue(format.reachedEnd());
        }
 
        @Test
@@ -244,164 +265,140 @@ public class DelimitedInputFormatTest {
         * Tests that the records are read correctly when the split boundary is 
in the middle of a record.
         */
        @Test
-       public void testReadOverSplitBoundariesUnaligned() {
-               try {
-                       final String myString = "value1\nvalue2\nvalue3";
-                       final FileInputSplit split = createTempFile(myString);
-                       
-                       FileInputSplit split1 = new FileInputSplit(0, 
split.getPath(), 0, split.getLength() / 2, split.getHostnames());
-                       FileInputSplit split2 = new FileInputSplit(1, 
split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
-
-                       final Configuration parameters = new Configuration();
-                       
-                       format.configure(parameters);
-                       format.open(split1);
-                       
-                       assertEquals("value1", format.nextRecord(null));
-                       assertEquals("value2", format.nextRecord(null));
-                       assertNull(format.nextRecord(null));
-                       assertTrue(format.reachedEnd());
-                       
-                       format.close();
-                       format.open(split2);
+       public void testReadOverSplitBoundariesUnaligned() throws IOException {
+               final String myString = "value1\nvalue2\nvalue3";
+               final FileInputSplit split = createTempFile(myString);
 
-                       assertEquals("value3", format.nextRecord(null));
-                       assertNull(format.nextRecord(null));
-                       assertTrue(format.reachedEnd());
-                       
-                       format.close();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+               FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 
0, split.getLength() / 2, split.getHostnames());
+               FileInputSplit split2 = new FileInputSplit(1, split.getPath(), 
split1.getLength(), split.getLength(), split.getHostnames());
+
+               final Configuration parameters = new Configuration();
+
+               format.configure(parameters);
+               format.open(split1);
+
+               assertEquals("value1", format.nextRecord(null));
+               assertEquals("value2", format.nextRecord(null));
+               assertNull(format.nextRecord(null));
+               assertTrue(format.reachedEnd());
+
+               format.close();
+               format.open(split2);
+
+               assertEquals("value3", format.nextRecord(null));
+               assertNull(format.nextRecord(null));
+               assertTrue(format.reachedEnd());
+
+               format.close();
        }
 
        /**
         * Tests that the correct number of records is read when the split 
boundary is exact at the record boundary.
         */
        @Test
-       public void testReadWithBufferSizeIsMultple() {
-               try {
-                       final String myString = 
"aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
-                       final FileInputSplit split = createTempFile(myString);
-
-                       FileInputSplit split1 = new FileInputSplit(0, 
split.getPath(), 0, split.getLength() / 2, split.getHostnames());
-                       FileInputSplit split2 = new FileInputSplit(1, 
split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
+       public void testReadWithBufferSizeIsMultiple() throws IOException {
+               final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
+               final FileInputSplit split = createTempFile(myString);
 
-                       final Configuration parameters = new Configuration();
+               FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 
0, split.getLength() / 2, split.getHostnames());
+               FileInputSplit split2 = new FileInputSplit(1, split.getPath(), 
split1.getLength(), split.getLength(), split.getHostnames());
 
-                       format.setBufferSize(2 * ((int) split1.getLength()));
-                       format.configure(parameters);
+               final Configuration parameters = new Configuration();
 
-                       String next;
-                       int count = 0;
+               format.setBufferSize(2 * ((int) split1.getLength()));
+               format.configure(parameters);
 
-                       // read split 1
-                       format.open(split1);
-                       while ((next = format.nextRecord(null)) != null) {
-                               assertEquals(7, next.length());
-                               count++;
-                       }
-                       assertNull(format.nextRecord(null));
-                       assertTrue(format.reachedEnd());
-                       format.close();
-                       
-                       // this one must have read one too many, because the 
next split will skipp the trailing remainder
-                       // which happens to be one full record
-                       assertEquals(3, count);
-
-                       // read split 2
-                       format.open(split2);
-                       while ((next = format.nextRecord(null)) != null) {
-                               assertEquals(7, next.length());
-                               count++;
-                       }
-                       format.close();
+               String next;
+               int count = 0;
 
-                       assertEquals(4, count);
+               // read split 1
+               format.open(split1);
+               while ((next = format.nextRecord(null)) != null) {
+                       assertEquals(7, next.length());
+                       count++;
                }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
+               assertNull(format.nextRecord(null));
+               assertTrue(format.reachedEnd());
+               format.close();
+
+               // this one must have read one too many, because the next split 
will skipp the trailing remainder
+               // which happens to be one full record
+               assertEquals(3, count);
+
+               // read split 2
+               format.open(split2);
+               while ((next = format.nextRecord(null)) != null) {
+                       assertEquals(7, next.length());
+                       count++;
                }
+               format.close();
+
+               assertEquals(4, count);
        }
 
        @Test
-       public void testReadExactlyBufferSize() {
-               try {
-                       final String myString = 
"aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
-                       
-                       final FileInputSplit split = createTempFile(myString);
-                       final Configuration parameters = new Configuration();
-                       
-                       format.setBufferSize((int) split.getLength());
-                       format.configure(parameters);
-                       format.open(split);
+       public void testReadExactlyBufferSize() throws IOException {
+               final String myString = "aaaaaaa\nbbbbbbb\nccccccc\nddddddd\n";
 
-                       String next;
-                       int count = 0;
-                       while ((next = format.nextRecord(null)) != null) {
-                               assertEquals(7, next.length());
-                               count++;
-                       }
-                       assertNull(format.nextRecord(null));
-                       assertTrue(format.reachedEnd());
+               final FileInputSplit split = createTempFile(myString);
+               final Configuration parameters = new Configuration();
 
-                       format.close();
-                       
-                       assertEquals(4, count);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
+               format.setBufferSize((int) split.getLength());
+               format.configure(parameters);
+               format.open(split);
+
+               String next;
+               int count = 0;
+               while ((next = format.nextRecord(null)) != null) {
+                       assertEquals(7, next.length());
+                       count++;
                }
+               assertNull(format.nextRecord(null));
+               assertTrue(format.reachedEnd());
+
+               format.close();
+
+               assertEquals(4, count);
        }
 
        @Test
-       public void testReadRecordsLargerThanBuffer() {
-               try {
-                       final String myString = "aaaaaaaaaaaaaaaaaaaaa\n" +
-                                                                       
"bbbbbbbbbbbbbbbbbbbbbbbbb\n" +
-                                                                       
"ccccccccccccccccccc\n" +
-                                                                       
"ddddddddddddddddddddddddddddddddddd\n";
-
-                       final FileInputSplit split = createTempFile(myString);
-                       FileInputSplit split1 = new FileInputSplit(0, 
split.getPath(), 0, split.getLength() / 2, split.getHostnames());
-                       FileInputSplit split2 = new FileInputSplit(1, 
split.getPath(), split1.getLength(), split.getLength(), split.getHostnames());
-                       
-                       final Configuration parameters = new Configuration();
-
-                       format.setBufferSize(8);
-                       format.configure(parameters);
-
-                       String next;
-                       List<String> result = new ArrayList<String>();
-                       
-                       
-                       format.open(split1);
-                       while ((next = format.nextRecord(null)) != null) {
-                               result.add(next);
-                       }
-                       assertNull(format.nextRecord(null));
-                       assertTrue(format.reachedEnd());
-                       format.close();
+       public void testReadRecordsLargerThanBuffer() throws IOException {
+               final String myString = "aaaaaaaaaaaaaaaaaaaaa\n" +
+                                                               
"bbbbbbbbbbbbbbbbbbbbbbbbb\n" +
+                                                               
"ccccccccccccccccccc\n" +
+                                                               
"ddddddddddddddddddddddddddddddddddd\n";
 
-                       format.open(split2);
-                       while ((next = format.nextRecord(null)) != null) {
-                               result.add(next);
-                       }
-                       assertNull(format.nextRecord(null));
-                       assertTrue(format.reachedEnd());
-                       format.close();
-                       
-                       assertEquals(4, result.size());
-                       assertEquals(Arrays.asList(myString.split("\n")), 
result);
+               final FileInputSplit split = createTempFile(myString);
+               FileInputSplit split1 = new FileInputSplit(0, split.getPath(), 
0, split.getLength() / 2, split.getHostnames());
+               FileInputSplit split2 = new FileInputSplit(1, split.getPath(), 
split1.getLength(), split.getLength(), split.getHostnames());
+
+               final Configuration parameters = new Configuration();
+
+               format.setBufferSize(8);
+               format.configure(parameters);
+
+               String next;
+               List<String> result = new ArrayList<String>();
+
+
+               format.open(split1);
+               while ((next = format.nextRecord(null)) != null) {
+                       result.add(next);
                }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
+               assertNull(format.nextRecord(null));
+               assertTrue(format.reachedEnd());
+               format.close();
+
+               format.open(split2);
+               while ((next = format.nextRecord(null)) != null) {
+                       result.add(next);
                }
+               assertNull(format.nextRecord(null));
+               assertTrue(format.reachedEnd());
+               format.close();
+
+               assertEquals(4, result.size());
+               assertEquals(Arrays.asList(myString.split("\n")), result);
        }
 
        static FileInputSplit createTempFile(String contents) throws 
IOException {

Reply via email to