Repository: flink Updated Branches: refs/heads/release-1.2 19fe04758 -> 5168b9f62
[FLINK-5907] [java] Fix handling of trailing empty fields in CsvInputFormat. This closes #3417. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5168b9f6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5168b9f6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5168b9f6 Branch: refs/heads/release-1.2 Commit: 5168b9f62a05176aca5bd3c094241daaa4d14b2e Parents: 19fe047 Author: Kurt Young <[email protected]> Authored: Sat Feb 25 16:37:37 2017 +0800 Committer: Fabian Hueske <[email protected]> Committed: Tue Feb 28 00:49:35 2017 +0100 ---------------------------------------------------------------------- .../api/common/io/GenericCsvInputFormat.java | 30 +++++++- .../apache/flink/types/parser/FieldParser.java | 21 +++++ .../common/io/GenericCsvInputFormatTest.java | 4 +- .../flink/types/parser/FieldParserTest.java | 46 +++++++++++ .../flink/api/java/io/RowCsvInputFormat.java | 13 +++- .../flink/api/java/io/CsvInputFormatTest.java | 81 +++++++++++++++++++- .../api/java/io/RowCsvInputFormatTest.java | 75 ++++++++++++++++-- 7 files changed, 258 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index 20c643e..b934d41 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -358,14 +358,14 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> for (int field = 0, output = 0; field < fieldIncluded.length; field++) { // check valid start position - if (startPos >= limit) { + if (startPos > limit || (startPos == limit && field != fieldIncluded.length - 1)) { if (lenient) { return false; } else { throw new ParseException("Row too short: " + new String(bytes, offset, numBytes)); } } - + if (fieldIncluded[field]) { // parse field @SuppressWarnings("unchecked") @@ -373,7 +373,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> Object reuse = holders[output]; startPos = parser.resetErrorStateAndParse(bytes, startPos, limit, this.fieldDelim, reuse); holders[output] = parser.getLastResult(); - + // check parse result if (startPos < 0) { // no good @@ -387,6 +387,17 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> + "in file: " + filePath); } } + else if (startPos == limit + && field != fieldIncluded.length - 1 + && !FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelim)) { + // We are at the end of the record, but not all fields have been read + // and the end is not a field delimiter indicating an empty last field. + if (lenient) { + return false; + } else { + throw new ParseException("Row too short: " + new String(bytes, offset, numBytes)); + } + } output++; } else { @@ -398,6 +409,19 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> throw new ParseException("Line could not be parsed: '" + lineAsString+"'\n" + "Expect field types: "+fieldTypesToString()+" \n" + "in file: "+filePath); + } else { + return false; + } + } + else if (startPos == limit + && field != fieldIncluded.length - 1 + && !FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelim)) { + // We are at the end of the record, but not all fields have been read + // and the end is not a field delimiter indicating an empty last field. + if (lenient) { + return false; + } else { + throw new ParseException("Row too short: " + new String(bytes, offset, numBytes)); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index cf3c83d..c45f820 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -156,6 +156,27 @@ public abstract class FieldParser<T> { return true; } + + /** + * Checks if the given bytes ends with the delimiter at the given end position. + * + * @param bytes The byte array that holds the value. + * @param endPos The index of the byte array where the check for the delimiter ends. + * @param delim The delimiter to check for. + * + * @return true if a delimiter ends at the given end position, false otherwise. + */ + public static final boolean endsWithDelimiter(byte[] bytes, int endPos, byte[] delim) { + if (endPos < delim.length - 1) { + return false; + } + for (int pos = 0; pos < delim.length; ++pos) { + if (delim[pos] != bytes[endPos - delim.length + 1 + pos]) { + return false; + } + } + return true; + } /** * Sets the error state of the parser. Called by subclasses of the parser to set the type of error http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java index c11a573..4873fa8 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java @@ -522,14 +522,14 @@ public class GenericCsvInputFormatTest { "kkz|777|foobar|hhg\n" + // wrong data type in field "kkz|777foobarhhg \n" + // too short, a skipped field never ends "xyx|ignored|42|\n"; // another good line - final FileInputSplit split = createTempFile(fileContent); + final FileInputSplit split = createTempFile(fileContent); final Configuration parameters = new Configuration(); format.setFieldDelimiter("|"); format.setFieldTypesGeneric(StringValue.class, null, IntValue.class); format.setLenient(true); - + format.configure(parameters); format.open(split); http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java new file mode 100644 index 0000000..bcb2bfb --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.types.parser; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class FieldParserTest { + + @Test + public void testDelimiterNext() throws Exception { + byte[] bytes = "aaabc".getBytes(); + byte[] delim = "aa".getBytes(); + assertTrue(FieldParser.delimiterNext(bytes, 0, delim)); + assertTrue(FieldParser.delimiterNext(bytes, 1, delim)); + assertFalse(FieldParser.delimiterNext(bytes, 2, delim)); + } + + @Test + public void testEndsWithDelimiter() throws Exception { + byte[] bytes = "aabc".getBytes(); + byte[] delim = "ab".getBytes(); + assertFalse(FieldParser.endsWithDelimiter(bytes, 0, delim)); + assertFalse(FieldParser.endsWithDelimiter(bytes, 1, delim)); + assertTrue(FieldParser.endsWithDelimiter(bytes, 2, delim)); + assertFalse(FieldParser.endsWithDelimiter(bytes, 3, delim)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java index af2e9e4..ce37c74 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java @@ -151,7 +151,7 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultType while (field < fieldIncluded.length) { // check valid start position - if (startPos >= limit) { + if (startPos > limit || (startPos == limit && field != fieldIncluded.length - 1)) { if (isLenient()) { return false; } else { @@ -198,6 +198,17 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultType throw new ParseException(String.format("Unexpected parser position for column %1$s of row '%2$s'", field, new String(bytes, offset, numBytes))); } + else if (startPos == limit + && field != fieldIncluded.length - 1 + && !FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelimiter)) { + // We are at the end of the record, but not all fields have been read + // and the end is not a field delimiter indicating an empty last field. + if (isLenient()) { + return false; + } else { + throw new ParseException("Row too short: " + new String(bytes, offset, numBytes)); + } + } field++; } http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java index cc0d5bc..a303ff7 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java @@ -430,7 +430,7 @@ public class CsvInputFormatTest { assertEquals("", result.f0); assertEquals("", result.f1); assertEquals("", result.f2); - + result = format.nextRecord(result); assertNull(result); assertTrue(format.reachedEnd()); @@ -441,6 +441,57 @@ public class CsvInputFormatTest { } @Test + public void testTailingEmptyFields() throws Exception { + final String fileContent = "aa,bb,cc\n" + // ok + "aa,bb,\n" + // the last field is empty + "aa,,\n" + // the last two fields are empty + ",,\n" + // all fields are empty + "aa,bb"; // row too short + final FileInputSplit split = createTempFile(fileContent); + + final TupleTypeInfo<Tuple3<String, String, String>> typeInfo = + TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class); + final CsvInputFormat<Tuple3<String, String, String>> format = + new TupleCsvInputFormat<Tuple3<String, String, String>>(PATH, typeInfo); + + format.setFieldDelimiter(","); + + format.configure(new Configuration()); + format.open(split); + + Tuple3<String, String, String> result = new Tuple3<String, String, String>(); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("aa", result.f0); + assertEquals("bb", result.f1); + assertEquals("cc", result.f2); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("aa", result.f0); + assertEquals("bb", result.f1); + assertEquals("", result.f2); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("aa", result.f0); + assertEquals("", result.f1); + assertEquals("", result.f2); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("", result.f0); + assertEquals("", result.f1); + assertEquals("", result.f2); + + try { + format.nextRecord(result); + fail("Parse Exception was not thrown! (Row too short)"); + } catch (ParseException e) {} + } + + @Test public void testIntegerFields() throws IOException { try { final String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"; @@ -957,6 +1008,34 @@ public class CsvInputFormatTest { } @Test + public void testPojoTypeWithTrailingEmptyFields() throws Exception { + final String fileContent = "123,,3.123,,\n456,BBB,3.23,,"; + final FileInputSplit split = createTempFile(fileContent); + + @SuppressWarnings("unchecked") + PojoTypeInfo<PrivatePojoItem> typeInfo = (PojoTypeInfo<PrivatePojoItem>) TypeExtractor.createTypeInfo(PrivatePojoItem.class); + CsvInputFormat<PrivatePojoItem> inputFormat = new PojoCsvInputFormat<PrivatePojoItem>(PATH, typeInfo); + + inputFormat.configure(new Configuration()); + inputFormat.open(split); + + PrivatePojoItem item = new PrivatePojoItem(); + inputFormat.nextRecord(item); + + assertEquals(123, item.field1); + assertEquals("", item.field2); + assertEquals(Double.valueOf(3.123), item.field3); + assertEquals("", item.field4); + + inputFormat.nextRecord(item); + + assertEquals(456, item.field1); + assertEquals("BBB", item.field2); + assertEquals(Double.valueOf(3.23), item.field3); + assertEquals("", item.field4); + } + + @Test public void testPojoTypeWithMappingInformation() throws Exception { File tempFile = File.createTempFile("CsvReaderPojoType", "tmp"); tempFile.deleteOnExit(); http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java index b819641..f6bda30 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java @@ -230,7 +230,7 @@ public class RowCsvInputFormatTest { @Test public void readStringFields() throws Exception { - String fileContent = "abc|def|ghijk\nabc||hhg\n|||"; + String fileContent = "abc|def|ghijk\nabc||hhg\n|||\n||"; FileInputSplit split = createTempFile(fileContent); @@ -264,13 +264,19 @@ public class RowCsvInputFormatTest { assertEquals("", result.getField(2)); result = format.nextRecord(result); + assertNotNull(result); + assertEquals("", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("", result.getField(2)); + + result = format.nextRecord(result); assertNull(result); assertTrue(format.reachedEnd()); } @Test public void readMixedQuotedStringFields() throws Exception { - String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||"; + String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||\n"; FileInputSplit split = createTempFile(fileContent); @@ -351,6 +357,65 @@ public class RowCsvInputFormatTest { } @Test + public void testTailingEmptyFields() throws Exception { + String fileContent = "abc|-def|-ghijk\n" + + "abc|-def|-\n" + + "abc|-|-\n" + + "|-|-|-\n" + + "|-|-\n" + + "abc|-def\n"; + + FileInputSplit split = createTempFile(fileContent); + + TypeInformation[] fieldTypes = new TypeInformation[]{ + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO}; + + RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|"); + format.setFieldDelimiter("|-"); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(3); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("abc", result.getField(0)); + assertEquals("def", result.getField(1)); + assertEquals("ghijk", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("abc", result.getField(0)); + assertEquals("def", result.getField(1)); + assertEquals("", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("abc", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("", result.getField(2)); + + try { + format.nextRecord(result); + fail("Parse Exception was not thrown! (Row too short)"); + } catch (ParseException e) {} + } + + @Test public void testIntegerFields() throws Exception { String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"; @@ -396,12 +461,12 @@ public class RowCsvInputFormatTest { public void testEmptyFields() throws Exception { String fileContent = ",,,,,,,,\n" + + ",,,,,,,\n" + ",,,,,,,,\n" + + ",,,,,,,\n" + ",,,,,,,,\n" + ",,,,,,,,\n" + - ",,,,,,,,\n" + - ",,,,,,,,\n" + - ",,,,,,,,\n" + + ",,,,,,,\n" + ",,,,,,,,\n"; FileInputSplit split = createTempFile(fileContent);
