Repository: flink
Updated Branches:
  refs/heads/release-1.2 19fe04758 -> 5168b9f62


[FLINK-5907] [java] Fix handling of trailing empty fields in CsvInputFormat.

This closes #3417.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5168b9f6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5168b9f6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5168b9f6

Branch: refs/heads/release-1.2
Commit: 5168b9f62a05176aca5bd3c094241daaa4d14b2e
Parents: 19fe047
Author: Kurt Young <[email protected]>
Authored: Sat Feb 25 16:37:37 2017 +0800
Committer: Fabian Hueske <[email protected]>
Committed: Tue Feb 28 00:49:35 2017 +0100

----------------------------------------------------------------------
 .../api/common/io/GenericCsvInputFormat.java    | 30 +++++++-
 .../apache/flink/types/parser/FieldParser.java  | 21 +++++
 .../common/io/GenericCsvInputFormatTest.java    |  4 +-
 .../flink/types/parser/FieldParserTest.java     | 46 +++++++++++
 .../flink/api/java/io/RowCsvInputFormat.java    | 13 +++-
 .../flink/api/java/io/CsvInputFormatTest.java   | 81 +++++++++++++++++++-
 .../api/java/io/RowCsvInputFormatTest.java      | 75 ++++++++++++++++--
 7 files changed, 258 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 20c643e..b934d41 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -358,14 +358,14 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                for (int field = 0, output = 0; field < fieldIncluded.length; 
field++) {
                        
                        // check valid start position
-                       if (startPos >= limit) {
+                       if (startPos > limit || (startPos == limit && field != 
fieldIncluded.length - 1)) {
                                if (lenient) {
                                        return false;
                                } else {
                                        throw new ParseException("Row too 
short: " + new String(bytes, offset, numBytes));
                                }
                        }
-                       
+
                        if (fieldIncluded[field]) {
                                // parse field
                                @SuppressWarnings("unchecked")
@@ -373,7 +373,7 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                                Object reuse = holders[output];
                                startPos = 
parser.resetErrorStateAndParse(bytes, startPos, limit, this.fieldDelim, reuse);
                                holders[output] = parser.getLastResult();
-                               
+
                                // check parse result
                                if (startPos < 0) {
                                        // no good
@@ -387,6 +387,17 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                                                                + "in file: " + 
filePath);
                                        }
                                }
+                               else if (startPos == limit
+                                               && field != 
fieldIncluded.length - 1
+                                               && 
!FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelim)) {
+                                       // We are at the end of the record, but 
not all fields have been read
+                                       // and the end is not a field delimiter 
indicating an empty last field.
+                                       if (lenient) {
+                                               return false;
+                                       } else {
+                                               throw new ParseException("Row 
too short: " + new String(bytes, offset, numBytes));
+                                       }
+                               }
                                output++;
                        }
                        else {
@@ -398,6 +409,19 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                                                throw new ParseException("Line 
could not be parsed: '" + lineAsString+"'\n"
                                                                + "Expect field 
types: "+fieldTypesToString()+" \n"
                                                                + "in file: 
"+filePath);
+                                       } else {
+                                               return false;
+                                       }
+                               }
+                               else if (startPos == limit
+                                               && field != 
fieldIncluded.length - 1
+                                               && 
!FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelim)) {
+                                       // We are at the end of the record, but 
not all fields have been read
+                                       // and the end is not a field delimiter 
indicating an empty last field.
+                                       if (lenient) {
+                                               return false;
+                                       } else {
+                                               throw new ParseException("Row 
too short: " + new String(bytes, offset, numBytes));
                                        }
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index cf3c83d..c45f820 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -156,6 +156,27 @@ public abstract class FieldParser<T> {
                return true;
                
        }
+
+       /**
+        * Checks if the given bytes ends with the delimiter at the given end 
position.
+        *
+        * @param bytes  The byte array that holds the value.
+        * @param endPos The index of the byte array where the check for the 
delimiter ends.
+        * @param delim  The delimiter to check for.
+        *
+        * @return true if a delimiter ends at the given end position, false 
otherwise.
+        */
+       public static final boolean endsWithDelimiter(byte[] bytes, int endPos, 
byte[] delim) {
+               if (endPos < delim.length - 1) {
+                       return false;
+               }
+               for (int pos = 0; pos < delim.length; ++pos) {
+                       if (delim[pos] != bytes[endPos - delim.length + 1 + 
pos]) {
+                               return false;
+                       }
+               }
+               return true;
+       }
        
        /**
         * Sets the error state of the parser. Called by subclasses of the 
parser to set the type of error

http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
index c11a573..4873fa8 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
@@ -522,14 +522,14 @@ public class GenericCsvInputFormatTest {
                                                                        
"kkz|777|foobar|hhg\n" +  // wrong data type in field
                                                                        
"kkz|777foobarhhg  \n" +  // too short, a skipped field never ends
                                                                        
"xyx|ignored|42|\n";      // another good line
-                       final FileInputSplit split = 
createTempFile(fileContent);       
+                       final FileInputSplit split = 
createTempFile(fileContent);
                
                        final Configuration parameters = new Configuration();
 
                        format.setFieldDelimiter("|");
                        format.setFieldTypesGeneric(StringValue.class, null, 
IntValue.class);
                        format.setLenient(true);
-                       
+
                        format.configure(parameters);
                        format.open(split);
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
new file mode 100644
index 0000000..bcb2bfb
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.parser;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class FieldParserTest {
+
+       @Test
+       public void testDelimiterNext() throws Exception {
+               byte[] bytes = "aaabc".getBytes();
+               byte[] delim = "aa".getBytes();
+               assertTrue(FieldParser.delimiterNext(bytes, 0, delim));
+               assertTrue(FieldParser.delimiterNext(bytes, 1, delim));
+               assertFalse(FieldParser.delimiterNext(bytes, 2, delim));
+       }
+
+       @Test
+       public void testEndsWithDelimiter() throws Exception {
+               byte[] bytes = "aabc".getBytes();
+               byte[] delim = "ab".getBytes();
+               assertFalse(FieldParser.endsWithDelimiter(bytes, 0, delim));
+               assertFalse(FieldParser.endsWithDelimiter(bytes, 1, delim));
+               assertTrue(FieldParser.endsWithDelimiter(bytes, 2, delim));
+               assertFalse(FieldParser.endsWithDelimiter(bytes, 3, delim));
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
index af2e9e4..ce37c74 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -151,7 +151,7 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> 
implements ResultType
                while (field < fieldIncluded.length) {
 
                        // check valid start position
-                       if (startPos >= limit) {
+                       if (startPos > limit || (startPos == limit && field != 
fieldIncluded.length - 1)) {
                                if (isLenient()) {
                                        return false;
                                } else {
@@ -198,6 +198,17 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> 
implements ResultType
                                throw new 
ParseException(String.format("Unexpected parser position for column %1$s of row 
'%2$s'",
                                        field, new String(bytes, offset, 
numBytes)));
                        }
+                       else if (startPos == limit
+                                       && field != fieldIncluded.length - 1
+                                       && 
!FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelimiter)) {
+                               // We are at the end of the record, but not all 
fields have been read
+                               // and the end is not a field delimiter 
indicating an empty last field.
+                               if (isLenient()) {
+                                       return false;
+                               } else {
+                                       throw new ParseException("Row too 
short: " + new String(bytes, offset, numBytes));
+                               }
+                       }
 
                        field++;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index cc0d5bc..a303ff7 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -430,7 +430,7 @@ public class CsvInputFormatTest {
                        assertEquals("", result.f0);
                        assertEquals("", result.f1);
                        assertEquals("", result.f2);
-                       
+
                        result = format.nextRecord(result);
                        assertNull(result);
                        assertTrue(format.reachedEnd());
@@ -441,6 +441,57 @@ public class CsvInputFormatTest {
        }
 
        @Test
+       public void testTailingEmptyFields() throws Exception {
+               final String fileContent = "aa,bb,cc\n" + // ok
+                               "aa,bb,\n" +  // the last field is empty
+                               "aa,,\n" +    // the last two fields are empty
+                               ",,\n" +      // all fields are empty
+                               "aa,bb";      // row too short
+               final FileInputSplit split = createTempFile(fileContent);
+
+               final TupleTypeInfo<Tuple3<String, String, String>> typeInfo =
+                               
TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
+               final CsvInputFormat<Tuple3<String, String, String>> format =
+                               new TupleCsvInputFormat<Tuple3<String, String, 
String>>(PATH, typeInfo);
+
+               format.setFieldDelimiter(",");
+
+               format.configure(new Configuration());
+               format.open(split);
+
+               Tuple3<String, String, String> result = new Tuple3<String, 
String, String>();
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("aa", result.f0);
+               assertEquals("bb", result.f1);
+               assertEquals("cc", result.f2);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("aa", result.f0);
+               assertEquals("bb", result.f1);
+               assertEquals("", result.f2);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("aa", result.f0);
+               assertEquals("", result.f1);
+               assertEquals("", result.f2);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("", result.f0);
+               assertEquals("", result.f1);
+               assertEquals("", result.f2);
+
+               try {
+                       format.nextRecord(result);
+                       fail("Parse Exception was not thrown! (Row too short)");
+               } catch (ParseException e) {}
+       }
+
+       @Test
        public void testIntegerFields() throws IOException {
                try {
                        final String fileContent = 
"111|222|333|444|555\n666|777|888|999|000|\n";
@@ -957,6 +1008,34 @@ public class CsvInputFormatTest {
        }
 
        @Test
+       public void testPojoTypeWithTrailingEmptyFields() throws Exception {
+               final String fileContent = "123,,3.123,,\n456,BBB,3.23,,";
+               final FileInputSplit split = createTempFile(fileContent);
+
+               @SuppressWarnings("unchecked")
+               PojoTypeInfo<PrivatePojoItem> typeInfo = 
(PojoTypeInfo<PrivatePojoItem>) 
TypeExtractor.createTypeInfo(PrivatePojoItem.class);
+               CsvInputFormat<PrivatePojoItem> inputFormat = new 
PojoCsvInputFormat<PrivatePojoItem>(PATH, typeInfo);
+
+               inputFormat.configure(new Configuration());
+               inputFormat.open(split);
+
+               PrivatePojoItem item = new PrivatePojoItem();
+               inputFormat.nextRecord(item);
+
+               assertEquals(123, item.field1);
+               assertEquals("", item.field2);
+               assertEquals(Double.valueOf(3.123), item.field3);
+               assertEquals("", item.field4);
+
+               inputFormat.nextRecord(item);
+
+               assertEquals(456, item.field1);
+               assertEquals("BBB", item.field2);
+               assertEquals(Double.valueOf(3.23), item.field3);
+               assertEquals("", item.field4);
+       }
+
+       @Test
        public void testPojoTypeWithMappingInformation() throws Exception {
                File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
                tempFile.deleteOnExit();

http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
index b819641..f6bda30 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -230,7 +230,7 @@ public class RowCsvInputFormatTest {
 
        @Test
        public void readStringFields() throws Exception {
-               String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
+               String fileContent = "abc|def|ghijk\nabc||hhg\n|||\n||";
 
                FileInputSplit split = createTempFile(fileContent);
 
@@ -264,13 +264,19 @@ public class RowCsvInputFormatTest {
                assertEquals("", result.getField(2));
 
                result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("", result.getField(0));
+               assertEquals("", result.getField(1));
+               assertEquals("", result.getField(2));
+
+               result = format.nextRecord(result);
                assertNull(result);
                assertTrue(format.reachedEnd());
        }
 
        @Test
        public void readMixedQuotedStringFields() throws Exception {
-               String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
+               String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||\n";
 
                FileInputSplit split = createTempFile(fileContent);
 
@@ -351,6 +357,65 @@ public class RowCsvInputFormatTest {
        }
 
        @Test
+       public void testTailingEmptyFields() throws Exception {
+               String fileContent = "abc|-def|-ghijk\n" +
+                               "abc|-def|-\n" +
+                               "abc|-|-\n" +
+                               "|-|-|-\n" +
+                               "|-|-\n" +
+                               "abc|-def\n";
+
+               FileInputSplit split = createTempFile(fileContent);
+
+               TypeInformation[] fieldTypes = new TypeInformation[]{
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               BasicTypeInfo.STRING_TYPE_INFO};
+
+               RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
fieldTypes, "\n", "|");
+               format.setFieldDelimiter("|-");
+               format.configure(new Configuration());
+               format.open(split);
+
+               Row result = new Row(3);
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("abc", result.getField(0));
+               assertEquals("def", result.getField(1));
+               assertEquals("ghijk", result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("abc", result.getField(0));
+               assertEquals("def", result.getField(1));
+               assertEquals("", result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("abc", result.getField(0));
+               assertEquals("", result.getField(1));
+               assertEquals("", result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("", result.getField(0));
+               assertEquals("", result.getField(1));
+               assertEquals("", result.getField(2));
+
+               result = format.nextRecord(result);
+               assertNotNull(result);
+               assertEquals("", result.getField(0));
+               assertEquals("", result.getField(1));
+               assertEquals("", result.getField(2));
+
+               try {
+                       format.nextRecord(result);
+                       fail("Parse Exception was not thrown! (Row too short)");
+               } catch (ParseException e) {}
+       }
+
+       @Test
        public void testIntegerFields() throws Exception {
                String fileContent = 
"111|222|333|444|555\n666|777|888|999|000|\n";
 
@@ -396,12 +461,12 @@ public class RowCsvInputFormatTest {
        public void testEmptyFields() throws Exception {
                String fileContent =
                        ",,,,,,,,\n" +
+                               ",,,,,,,\n" +
                                ",,,,,,,,\n" +
+                               ",,,,,,,\n" +
                                ",,,,,,,,\n" +
                                ",,,,,,,,\n" +
-                               ",,,,,,,,\n" +
-                               ",,,,,,,,\n" +
-                               ",,,,,,,,\n" +
+                               ",,,,,,,\n" +
                                ",,,,,,,,\n";
 
                FileInputSplit split = createTempFile(fileContent);

Reply via email to