Repository: flink Updated Branches: refs/heads/master 867be9b6a -> fe0eb602d
[FLINK-3908] [core] Fix resetting of FieldParser error state. This closes #2007 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8747978 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8747978 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8747978 Branch: refs/heads/master Commit: d8747978386c0aacce1dc2631e1c5602ec9b2a00 Parents: 867be9b Author: Flavio Pompermaier <f.pomperma...@gmail.com> Authored: Thu May 19 11:41:43 2016 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed Jun 15 23:43:26 2016 +0200 ---------------------------------------------------------------------- .../api/common/io/GenericCsvInputFormat.java | 2 +- .../flink/types/parser/ByteValueParser.java | 2 +- .../apache/flink/types/parser/FieldParser.java | 28 +++++++++++++++++--- .../flink/types/parser/IntValueParser.java | 2 +- .../apache/flink/types/parser/LongParser.java | 4 +-- .../flink/types/parser/LongValueParser.java | 4 +-- .../apache/flink/types/parser/ShortParser.java | 2 +- .../flink/types/parser/ShortValueParser.java | 2 +- .../apache/flink/types/parser/StringParser.java | 8 +++--- .../flink/types/parser/StringValueParser.java | 8 +++--- .../flink/api/java/io/PrimitiveInputFormat.java | 2 +- 11 files changed, 42 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index 31d42ff..e2c54ad 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -392,7 +392,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> @SuppressWarnings("unchecked") FieldParser<Object> parser = (FieldParser<Object>) this.fieldParsers[output]; Object reuse = holders[output]; - startPos = parser.parseField(bytes, startPos, limit, this.fieldDelim, reuse); + startPos = parser.resetErrorStateAndParse(bytes, startPos, limit, this.fieldDelim, reuse); holders[output] = parser.getLastResult(); // check parse result http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java index 416a498..4cda98c 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java @@ -38,7 +38,7 @@ public class ByteValueParser extends FieldParser<ByteValue> { this.result = reusable; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit - delimiter.length + 1; if (bytes[startPos] == '-') { neg = true; http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index 57689a8..67c1bd7 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -79,8 +79,9 @@ public abstract class FieldParser<T> { private ParseErrorState errorState = ParseErrorState.NONE; /** - * Parses the value of a field from the byte array. - * The start position within the byte array and the array's valid length is given. + * Parses the value of a field from the byte array, taking care of properly reset + * the state of this parser. + * The start position within the byte array and the array's valid length is given. * The content of the value is delimited by a field delimiter. * * @param bytes The byte array that holds the value. @@ -92,8 +93,27 @@ public abstract class FieldParser<T> { * * @return The index of the next delimiter, if the field was parsed correctly. A value less than 0 otherwise. */ - public abstract int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse); - + public int resetErrorStateAndParse(byte[] bytes, int startPos, int limit, byte[] delim, T reuse) { + resetParserState(); + return parseField(bytes, startPos, limit, delim, reuse); + } + + /** + * Each parser's logic should be implemented inside this method + * + * @see {@link FieldParser#parseField(byte[], int, int, byte[], Object)} + * */ + protected abstract int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse); + + /** + * Reset the state of the parser. Called as the very first method inside + * {@link FieldParser#resetErrorStateAndParse(byte[], int, int, byte[], Object)}, by default it just reset + * its error state. + * */ + protected void resetParserState() { + this.errorState = ParseErrorState.NONE; + } + /** * Gets the parsed field. This method returns the value parsed by the last successful invocation of * {@link #parseField(byte[], int, int, byte[], Object)}. It objects are mutable and reused, it will return http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java index d493e2d..d487c66 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java @@ -39,7 +39,7 @@ public class IntValueParser extends FieldParser<IntValue> { long val = 0; boolean neg = false; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit - delimiter.length + 1; this.result = reusable; http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java index e8ac09b..c7b76d2 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java @@ -72,7 +72,7 @@ public class LongParser extends FieldParser<Long> { if (i+1 >= limit) { return limit; - } else if (i+1 < delimLimit && delimiterNext(bytes, i+1, delimiter)) { + } else if (i + 1 < delimLimit && delimiterNext(bytes, i + 1, delimiter)) { return i + 1 + delimiter.length; } else { setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW); @@ -160,7 +160,7 @@ public class LongParser extends FieldParser<Long> { if (val < 0) { // this is an overflow/underflow, unless we hit exactly the Long.MIN_VALUE if (neg && val == Long.MIN_VALUE) { - if (length == 1 || bytes[startPos+1] == delimiter) { + if (length == 1 || bytes[startPos + 1] == delimiter) { return Long.MIN_VALUE; } else { throw new NumberFormatException("value overflow"); http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java index eaaf619..597abc0 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java @@ -36,7 +36,7 @@ public class LongValueParser extends FieldParser<LongValue> { long val = 0; boolean neg = false; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit - delimiter.length + 1; this.result = reusable; @@ -75,7 +75,7 @@ public class LongValueParser extends FieldParser<LongValue> { if (i+1 >= limit) { return limit; - } else if (i+1 < delimLimit && delimiterNext(bytes, i+1, delimiter)) { + } else if (i + 1 < delimLimit && delimiterNext(bytes, i + 1, delimiter)) { return i + 1 + delimiter.length; } else { setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW); http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java index 6f1fc7b..3afa761 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java @@ -39,7 +39,7 @@ public class ShortParser extends FieldParser<Short> { int val = 0; boolean neg = false; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit - delimiter.length + 1; if (bytes[startPos] == '-') { neg = true; http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java index c1df9ce..880af25 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java @@ -39,7 +39,7 @@ public class ShortValueParser extends FieldParser<ShortValue> { int val = 0; boolean neg = false; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit - delimiter.length + 1; this.result = reusable; http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java index 7dc80e5..9cee990 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java @@ -44,14 +44,14 @@ public class StringParser extends FieldParser<String> { int i = startPos; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit - delimiter.length + 1; if(quotedStringParsing && bytes[i] == quoteCharacter) { // quoted string parsing enabled and first character Vis a quote i++; // search for ending quote character, continue when it is escaped - while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)){ + while (i < limit && (bytes[i] != quoteCharacter || bytes[i - 1] == BACKSLASH)) { i++; } @@ -63,11 +63,11 @@ public class StringParser extends FieldParser<String> { // check for proper termination if (i == limit) { // either by end of line - this.result = new String(bytes, startPos+1, i - startPos - 2); + this.result = new String(bytes, startPos + 1, i - startPos - 2); return limit; } else if ( i < delimLimit && delimiterNext(bytes, i, delimiter)) { // or following field delimiter - this.result = new String(bytes, startPos+1, i - startPos - 2); + this.result = new String(bytes, startPos + 1, i - startPos - 2); return i + delimiter.length; } else { // no proper termination http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java index 586c2cc..bc89952 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java @@ -48,14 +48,14 @@ public class StringValueParser extends FieldParser<StringValue> { this.result = reusable; int i = startPos; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit - delimiter.length + 1; if(quotedStringParsing == true && bytes[i] == quoteCharacter) { // quoted string parsing enabled and first character is a quote i++; // search for ending quote character, continue when it is escaped - while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)){ + while (i < limit && (bytes[i] != quoteCharacter || bytes[i - 1] == BACKSLASH)) { i++; } @@ -67,11 +67,11 @@ public class StringValueParser extends FieldParser<StringValue> { // check for proper termination if (i == limit) { // either by end of line - reusable.setValueAscii(bytes, startPos+1, i - startPos - 2); + reusable.setValueAscii(bytes, startPos + 1, i - startPos - 2); return limit; } else if ( i < delimLimit && delimiterNext(bytes, i, delimiter)) { // or following field delimiter - reusable.setValueAscii(bytes, startPos+1, i - startPos - 2); + reusable.setValueAscii(bytes, startPos + 1, i - startPos - 2); return i + delimiter.length; } else { // no proper termination http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java index 3dbc966..75b82cd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java @@ -75,7 +75,7 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> { } // Null character as delimiter is used because there's only 1 field to be parsed - if (parser.parseField(bytes, offset, numBytes + offset, new byte[]{'\0'}, reuse) >= 0) { + if (parser.resetErrorStateAndParse(bytes, offset, numBytes + offset, new byte[]{'\0'}, reuse) >= 0) { return parser.getLastResult(); } else { String s = new String(bytes, offset, numBytes);