Repository: flink
Updated Branches:
  refs/heads/master 867be9b6a -> fe0eb602d


[FLINK-3908] [core] Fix resetting of FieldParser error state.

This closes #2007


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8747978
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8747978
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8747978

Branch: refs/heads/master
Commit: d8747978386c0aacce1dc2631e1c5602ec9b2a00
Parents: 867be9b
Author: Flavio Pompermaier <f.pomperma...@gmail.com>
Authored: Thu May 19 11:41:43 2016 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed Jun 15 23:43:26 2016 +0200

----------------------------------------------------------------------
 .../api/common/io/GenericCsvInputFormat.java    |  2 +-
 .../flink/types/parser/ByteValueParser.java     |  2 +-
 .../apache/flink/types/parser/FieldParser.java  | 28 +++++++++++++++++---
 .../flink/types/parser/IntValueParser.java      |  2 +-
 .../apache/flink/types/parser/LongParser.java   |  4 +--
 .../flink/types/parser/LongValueParser.java     |  4 +--
 .../apache/flink/types/parser/ShortParser.java  |  2 +-
 .../flink/types/parser/ShortValueParser.java    |  2 +-
 .../apache/flink/types/parser/StringParser.java |  8 +++---
 .../flink/types/parser/StringValueParser.java   |  8 +++---
 .../flink/api/java/io/PrimitiveInputFormat.java |  2 +-
 11 files changed, 42 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 31d42ff..e2c54ad 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -392,7 +392,7 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                                @SuppressWarnings("unchecked")
                                FieldParser<Object> parser = 
(FieldParser<Object>) this.fieldParsers[output];
                                Object reuse = holders[output];
-                               startPos = parser.parseField(bytes, startPos, 
limit, this.fieldDelim, reuse);
+                               startPos = 
parser.resetErrorStateAndParse(bytes, startPos, limit, this.fieldDelim, reuse);
                                holders[output] = parser.getLastResult();
                                
                                // check parse result

http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
index 416a498..4cda98c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
@@ -38,7 +38,7 @@ public class ByteValueParser extends FieldParser<ByteValue> {
                
                this.result = reusable;
 
-               final int delimLimit = limit-delimiter.length+1;
+               final int delimLimit = limit - delimiter.length + 1;
                
                if (bytes[startPos] == '-') {
                        neg = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index 57689a8..67c1bd7 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -79,8 +79,9 @@ public abstract class FieldParser<T> {
        private ParseErrorState errorState = ParseErrorState.NONE;
        
        /**
-        * Parses the value of a field from the byte array.
-        * The start position within the byte array and the array's valid 
length is given. 
+        * Parses the value of a field from the byte array, taking care of 
properly reset
+        * the state of this parser.
+        * The start position within the byte array and the array's valid 
length is given.
         * The content of the value is delimited by a field delimiter.
         * 
         * @param bytes The byte array that holds the value.
@@ -92,8 +93,27 @@ public abstract class FieldParser<T> {
         * 
         * @return The index of the next delimiter, if the field was parsed 
correctly. A value less than 0 otherwise.
         */
-       public abstract int parseField(byte[] bytes, int startPos, int limit, 
byte[] delim, T reuse);
-       
+       public int resetErrorStateAndParse(byte[] bytes, int startPos, int 
limit, byte[] delim, T reuse) {
+               resetParserState();
+               return parseField(bytes, startPos, limit, delim, reuse);
+       }
+
+       /**
+        * Each parser's logic should be implemented inside this method
+        *
+        * @see {@link FieldParser#parseField(byte[], int, int, byte[], Object)}
+        * */
+       protected abstract int parseField(byte[] bytes, int startPos, int 
limit, byte[] delim, T reuse);
+
+       /**
+        * Reset the state of the parser. Called as the very first method inside
+        * {@link FieldParser#resetErrorStateAndParse(byte[], int, int, byte[], 
Object)}, by default it just reset
+        * its error state.
+        * */
+       protected void resetParserState() {
+               this.errorState = ParseErrorState.NONE;
+       }
+
        /**
         * Gets the parsed field. This method returns the value parsed by the 
last successful invocation of
         * {@link #parseField(byte[], int, int, byte[], Object)}. It objects 
are mutable and reused, it will return

http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
index d493e2d..d487c66 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
@@ -39,7 +39,7 @@ public class IntValueParser extends FieldParser<IntValue> {
                long val = 0;
                boolean neg = false;
 
-               final int delimLimit = limit-delimiter.length+1;
+               final int delimLimit = limit - delimiter.length + 1;
 
                this.result = reusable;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
index e8ac09b..c7b76d2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
@@ -72,7 +72,7 @@ public class LongParser extends FieldParser<Long> {
 
                                        if (i+1 >= limit) {
                                                return limit;
-                                       } else if (i+1 < delimLimit && 
delimiterNext(bytes, i+1, delimiter)) {
+                                       } else if (i + 1 < delimLimit && 
delimiterNext(bytes, i + 1, delimiter)) {
                                                return i + 1 + delimiter.length;
                                        } else {
                                                
setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW);
@@ -160,7 +160,7 @@ public class LongParser extends FieldParser<Long> {
                        if (val < 0) {
                                // this is an overflow/underflow, unless we hit 
exactly the Long.MIN_VALUE
                                if (neg && val == Long.MIN_VALUE) {
-                                       if (length == 1 || bytes[startPos+1] == 
delimiter) {
+                                       if (length == 1 || bytes[startPos + 1] 
== delimiter) {
                                                return Long.MIN_VALUE;
                                        } else {
                                                throw new 
NumberFormatException("value overflow");

http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
index eaaf619..597abc0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
@@ -36,7 +36,7 @@ public class LongValueParser extends FieldParser<LongValue> {
                long val = 0;
                boolean neg = false;
 
-               final int delimLimit = limit-delimiter.length+1;
+               final int delimLimit = limit - delimiter.length + 1;
 
                this.result = reusable;
                
@@ -75,7 +75,7 @@ public class LongValueParser extends FieldParser<LongValue> {
                                        
                                        if (i+1 >= limit) {
                                                return limit;
-                                       } else if (i+1 < delimLimit && 
delimiterNext(bytes, i+1, delimiter)) {
+                                       } else if (i + 1 < delimLimit && 
delimiterNext(bytes, i + 1, delimiter)) {
                                                return i + 1 + delimiter.length;
                                        } else {
                                                
setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW);

http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
index 6f1fc7b..3afa761 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
@@ -39,7 +39,7 @@ public class ShortParser extends FieldParser<Short> {
                int val = 0;
                boolean neg = false;
 
-               final int delimLimit = limit-delimiter.length+1;
+               final int delimLimit = limit - delimiter.length + 1;
 
                if (bytes[startPos] == '-') {
                        neg = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
index c1df9ce..880af25 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
@@ -39,7 +39,7 @@ public class ShortValueParser extends FieldParser<ShortValue> 
{
                int val = 0;
                boolean neg = false;
 
-               final int delimLimit = limit-delimiter.length+1;
+               final int delimLimit = limit - delimiter.length + 1;
                
                this.result = reusable;
                

http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
index 7dc80e5..9cee990 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
@@ -44,14 +44,14 @@ public class StringParser extends FieldParser<String> {
 
                int i = startPos;
 
-               final int delimLimit = limit-delimiter.length+1;
+               final int delimLimit = limit - delimiter.length + 1;
 
                if(quotedStringParsing && bytes[i] == quoteCharacter) {
                        // quoted string parsing enabled and first character 
Vis a quote
                        i++;
 
                        // search for ending quote character, continue when it 
is escaped
-                       while (i < limit && (bytes[i] != quoteCharacter || 
bytes[i-1] == BACKSLASH)){
+                       while (i < limit && (bytes[i] != quoteCharacter || 
bytes[i - 1] == BACKSLASH)) {
                                i++;
                        }
 
@@ -63,11 +63,11 @@ public class StringParser extends FieldParser<String> {
                                // check for proper termination
                                if (i == limit) {
                                        // either by end of line
-                                       this.result = new String(bytes, 
startPos+1, i - startPos - 2);
+                                       this.result = new String(bytes, 
startPos + 1, i - startPos - 2);
                                        return limit;
                                } else if ( i < delimLimit && 
delimiterNext(bytes, i, delimiter)) {
                                        // or following field delimiter
-                                       this.result = new String(bytes, 
startPos+1, i - startPos - 2);
+                                       this.result = new String(bytes, 
startPos + 1, i - startPos - 2);
                                        return i + delimiter.length;
                                } else {
                                        // no proper termination

http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
index 586c2cc..bc89952 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
@@ -48,14 +48,14 @@ public class StringValueParser extends 
FieldParser<StringValue> {
                this.result = reusable;
                int i = startPos;
 
-               final int delimLimit = limit-delimiter.length+1;
+               final int delimLimit = limit - delimiter.length + 1;
 
                if(quotedStringParsing == true && bytes[i] == quoteCharacter) {
                        // quoted string parsing enabled and first character is 
a quote
                        i++;
 
                        // search for ending quote character, continue when it 
is escaped
-                       while (i < limit && (bytes[i] != quoteCharacter || 
bytes[i-1] == BACKSLASH)){
+                       while (i < limit && (bytes[i] != quoteCharacter || 
bytes[i - 1] == BACKSLASH)) {
                                i++;
                        }
 
@@ -67,11 +67,11 @@ public class StringValueParser extends 
FieldParser<StringValue> {
                                // check for proper termination
                                if (i == limit) {
                                        // either by end of line
-                                       reusable.setValueAscii(bytes, 
startPos+1, i - startPos - 2);
+                                       reusable.setValueAscii(bytes, startPos 
+ 1, i - startPos - 2);
                                        return limit;
                                } else if ( i < delimLimit && 
delimiterNext(bytes, i, delimiter)) {
                                        // or following field delimiter
-                                       reusable.setValueAscii(bytes, 
startPos+1, i - startPos - 2);
+                                       reusable.setValueAscii(bytes, startPos 
+ 1, i - startPos - 2);
                                        return i + delimiter.length;
                                } else {
                                        // no proper termination

http://git-wip-us.apache.org/repos/asf/flink/blob/d8747978/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index 3dbc966..75b82cd 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -75,7 +75,7 @@ public class PrimitiveInputFormat<OT> extends 
DelimitedInputFormat<OT> {
                }
 
                // Null character as delimiter is used because there's only 1 
field to be parsed
-               if (parser.parseField(bytes, offset, numBytes + offset, new 
byte[]{'\0'}, reuse) >= 0) {
+               if (parser.resetErrorStateAndParse(bytes, offset, numBytes + 
offset, new byte[]{'\0'}, reuse) >= 0) {
                        return parser.getLastResult();
                } else {
                        String s = new String(bytes, offset, numBytes);

Reply via email to