[FLINK-1168] Add support for multi-char field delimiters in CSVInputFormats.
This commit includes parts of Cbro's pull request and subsumes PR #247

This closes #247
This closes #264


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0548a93d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0548a93d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0548a93d

Branch: refs/heads/master
Commit: 0548a93dfc555a5403590f147d4850c730facaf6
Parents: 06b2acf
Author: Fabian Hueske <[email protected]>
Authored: Mon Oct 20 15:18:20 2014 +0200
Committer: Fabian Hueske <[email protected]>
Committed: Mon Jan 26 14:50:44 2015 +0100

----------------------------------------------------------------------
 .../flink/api/avro/AvroOutputFormatTest.java    |   2 +-
 .../program/ExecutionPlanCreationTest.java      |   2 +-
 .../api/common/io/DelimitedInputFormat.java     |  62 +-------
 .../api/common/io/GenericCsvInputFormat.java    |  52 +++---
 .../apache/flink/types/parser/ByteParser.java   |  12 +-
 .../flink/types/parser/ByteValueParser.java     |  13 +-
 .../apache/flink/types/parser/DoubleParser.java |  12 +-
 .../flink/types/parser/DoubleValueParser.java   |  14 +-
 .../apache/flink/types/parser/FieldParser.java  |  33 +++-
 .../apache/flink/types/parser/FloatParser.java  |  14 +-
 .../flink/types/parser/FloatValueParser.java    |  14 +-
 .../apache/flink/types/parser/IntParser.java    |  14 +-
 .../flink/types/parser/IntValueParser.java      |  14 +-
 .../apache/flink/types/parser/LongParser.java   |  14 +-
 .../flink/types/parser/LongValueParser.java     |  22 +--
 .../apache/flink/types/parser/ShortParser.java  |  10 +-
 .../flink/types/parser/ShortValueParser.java    |  10 +-
 .../apache/flink/types/parser/StringParser.java |  44 +++---
 .../flink/types/parser/StringValueParser.java   |  46 +++---
 .../common/io/GenericCsvInputFormatTest.java    |  89 +++++++++--
 .../flink/types/parser/ByteParserTest.java      |   4 +-
 .../flink/types/parser/ByteValueParserTest.java |   4 +-
 .../flink/types/parser/DoubleParserTest.java    |   4 +-
 .../types/parser/DoubleValueParserTest.java     |   4 +-
 .../flink/types/parser/FloatParserTest.java     |   4 +-
 .../types/parser/FloatValueParserTest.java      |   4 +-
 .../flink/types/parser/IntParserTest.java       |   4 +-
 .../flink/types/parser/IntValueParserTest.java  |   4 +-
 .../flink/types/parser/LongParserTest.java      |   5 +-
 .../flink/types/parser/LongValueParserTest.java |   4 +-
 .../flink/types/parser/ParserTestBase.java      | 157 +++++++++++++++----
 .../flink/types/parser/ShortParserTest.java     |   4 +-
 .../types/parser/ShortValueParserTest.java      |   5 +-
 .../flink/types/parser/StringParserTest.java    |  10 +-
 .../types/parser/StringValueParserTest.java     |   8 +-
 .../types/parser/VarLengthStringParserTest.java |  50 +++---
 .../flink/examples/java/clustering/KMeans.java  |   4 +-
 .../java/graph/ConnectedComponents.java         |   2 +-
 .../examples/java/graph/EnumTrianglesBasic.java |   2 +-
 .../examples/java/graph/EnumTrianglesOpt.java   |   2 +-
 .../examples/java/graph/PageRankBasic.java      |   4 +-
 .../java/graph/TransitiveClosureNaive.java      |   2 +-
 .../examples/java/ml/LinearRegression.java      |   2 +-
 .../relational/EmptyFieldsCountAccumulator.java |   2 +-
 .../examples/java/relational/TPCHQuery10.java   |   8 +-
 .../examples/java/relational/TPCHQuery3.java    |   6 +-
 .../java/relational/WebLogAnalysis.java         |   6 +-
 .../examples/scala/clustering/KMeans.scala      |   4 +-
 .../scala/graph/ConnectedComponents.scala       |   2 +-
 .../scala/graph/EnumTrianglesBasic.scala        |   2 +-
 .../examples/scala/graph/EnumTrianglesOpt.scala |   2 +-
 .../examples/scala/graph/PageRankBasic.scala    |   4 +-
 .../scala/graph/TransitiveClosureNaive.scala    |   2 +-
 .../examples/scala/ml/LinearRegression.scala    |   2 +-
 .../examples/scala/relational/TPCHQuery10.scala |   8 +-
 .../examples/scala/relational/TPCHQuery3.scala  |   6 +-
 .../scala/relational/WebLogAnalysis.scala       |   6 +-
 .../flink/api/java/io/CsvInputFormat.java       |   4 +-
 .../org/apache/flink/api/java/io/CsvReader.java |  15 +-
 .../flink/api/java/io/PrimitiveInputFormat.java |   2 +-
 .../api/java/record/io/CsvInputFormat.java      |   7 +-
 .../flink/api/java/io/CsvInputFormatTest.java   |  32 ++--
 .../api/java/record/io/CsvInputFormatTest.java  |   2 +-
 .../examples/java8/relational/TPCHQuery10.java  |   8 +-
 .../flink/api/scala/ExecutionEnvironment.scala  |   4 +-
 .../ConnectedComponentsWithObjectMapITCase.java |   2 +-
 .../flink/test/util/testjar/KMeansForTest.java  |   4 +-
 .../flink/api/scala/io/CsvInputFormatTest.scala |  17 +-
 68 files changed, 564 insertions(+), 379 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
index 42c1702..a8bace3 100644
--- 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
@@ -63,7 +63,7 @@ public class AvroOutputFormatTest extends JavaProgramTestBase 
{
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Tuple3<String, Integer, String>> input = 
env.readCsvFile(inputPath)
-                       .fieldDelimiter('|')
+                       .fieldDelimiter("|")
                        .types(String.class, Integer.class, String.class);
 
                //output the data with AvroOutputFormat for specific user type

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
index 83bdc8d..c6cada8 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
@@ -76,7 +76,7 @@ public class ExecutionPlanCreationTest {
                        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                        
                        DataSet<Tuple2<Long, Long>> input = 
env.readCsvFile(args[0])
-                                       .fieldDelimiter('\t').types(Long.class, 
Long.class);
+                                       .fieldDelimiter("\t").types(Long.class, 
Long.class);
                        
                        DataSet<Tuple2<Long, Long>> result = input.map(
                                        new MapFunction<Tuple2<Long,Long>, 
Tuple2<Long,Long>>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 52edfa7..d2b4e83 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -20,9 +20,6 @@
 package org.apache.flink.api.common.io;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.IllegalCharsetNameException;
-import java.nio.charset.UnsupportedCharsetException;
 import java.util.ArrayList;
 
 import org.slf4j.Logger;
@@ -188,27 +185,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
        }
        
        public void setDelimiter(String delimiter) {
-               setDelimiter(delimiter, Charsets.UTF_8);
-       }
-       
-       public void setDelimiter(String delimiter, String charsetName) throws 
IllegalCharsetNameException, UnsupportedCharsetException {
-               if (charsetName == null) {
-                       throw new IllegalArgumentException("Charset name must 
not be null");
-               }
-               
-               Charset charset = Charset.forName(charsetName);
-               setDelimiter(delimiter, charset);
-       }
-       
-       public void setDelimiter(String delimiter, Charset charset) {
-               if (delimiter == null) {
-                       throw new IllegalArgumentException("Delimiter must not 
be null");
-               }
-               if (charset == null) {
-                       throw new IllegalArgumentException("Charset must not be 
null");
-               }
-               
-               this.delimiter = delimiter.getBytes(charset);
+               this.delimiter = delimiter.getBytes(Charsets.UTF_8);
        }
        
        public int getLineLengthLimit() {
@@ -281,19 +258,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
                
                String delimString = parameters.getString(RECORD_DELIMITER, 
null);
                if (delimString != null) {
-                       String charsetName = 
parameters.getString(RECORD_DELIMITER_ENCODING, null);
-
-                       if (charsetName == null) {
-                               setDelimiter(delimString);
-                       } else {
-                               try {
-                                       setDelimiter(delimString, charsetName);
-                               }
-                               catch (UnsupportedCharsetException e) {
-                                       throw new IllegalArgumentException("The 
charset with the name '" + charsetName + 
-                                                       "' is not supported on 
this TaskManager instance.", e);
-                               }
-                       }
+                       setDelimiter(delimString);
                }
                
                // set the number of samples
@@ -639,11 +604,6 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
        protected static final String RECORD_DELIMITER = 
"delimited-format.delimiter";
        
        /**
-        * The configuration key to set the record delimiter encoding.
-        */
-       private static final String RECORD_DELIMITER_ENCODING = 
"delimited-format.delimiter-encoding";
-       
-       /**
         * The configuration key to set the number of samples to take for the 
statistics.
         */
        private static final String NUM_STATISTICS_SAMPLES = 
"delimited-format.numSamples";
@@ -713,24 +673,6 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> {
                }
                
                /**
-                * Sets the delimiter to be the given string. The string will 
be converted to bytes for more efficient
-                * comparison during input parsing. The conversion will be done 
using the charset with the given name.
-                * The charset must be available on the processing nodes, 
otherwise an exception will be raised at
-                * runtime.
-                * 
-                * @param delimiter The delimiter string.
-                * @param charsetName The name of the encoding character set.
-                * @return The builder itself.
-                */
-               public T recordDelimiter(String delimiter, String charsetName) {
-                       this.config.setString(RECORD_DELIMITER, delimiter);
-                       this.config.setString(RECORD_DELIMITER_ENCODING, 
charsetName);
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-               
-               /**
                 * Sets the number of line samples to take in order to estimate 
the base statistics for the
                 * input format.
                 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index a6ee1da..151b1e2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.api.common.io;
 
+import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Ints;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -38,7 +39,9 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        
        private static final boolean[] EMPTY_INCLUDED = new boolean[0];
        
-       private static final char DEFAULT_FIELD_DELIMITER = ',';
+       private static final byte[] DEFAULT_FIELD_DELIMITER = new byte[] {','};
+
+       private static final char QUOTE_CHARACTER = '"';
        
        
        // 
--------------------------------------------------------------------------------------------
@@ -57,7 +60,7 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        
        private boolean[] fieldIncluded = EMPTY_INCLUDED;
                
-       private char fieldDelim = DEFAULT_FIELD_DELIMITER;
+       private byte[] fieldDelim = DEFAULT_FIELD_DELIMITER;
        
        private boolean lenient;
        
@@ -86,16 +89,24 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                return this.fieldTypes.length;
        }
 
-       public char getFieldDelimiter() {
+       public byte[] getFieldDelimiter() {
                return fieldDelim;
        }
 
-       public void setFieldDelimiter(char fieldDelim) {
-               if (fieldDelim > Byte.MAX_VALUE) {
-                       throw new IllegalArgumentException("The field delimiter 
must be an ASCII character.");
+       public void setFieldDelimiter(byte[] delimiter) {
+               if (delimiter == null) {
+                       throw new IllegalArgumentException("Delimiter must not 
be null");
                }
-               
-               this.fieldDelim = fieldDelim;
+
+               this.fieldDelim = delimiter;
+       }
+
+       public void setFieldDelimiter(char delimiter) {
+               setFieldDelimiter(String.valueOf(delimiter));
+       }
+
+       public void setFieldDelimiter(String delimiter) {
+               this.fieldDelim = delimiter.getBytes(Charsets.UTF_8);
        }
 
        public boolean isLenient() {
@@ -308,7 +319,7 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                        }
                        else {
                                // skip field
-                               startPos = skipFields(bytes, startPos, limit, 
fieldDelim);
+                               startPos = skipFields(bytes, startPos, limit, 
this.fieldDelim);
                                if (startPos < 0) {
                                        if (!lenient) {
                                                String lineAsString = new 
String(bytes, offset, numBytes);
@@ -325,7 +336,7 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        private String fieldTypesToString() {
                StringBuilder string = new StringBuilder();
                string.append(this.fieldTypes[0].toString());
-               
+
                for (int i = 1; i < this.fieldTypes.length; i++) {
                        string.append(", ").append(this.fieldTypes[i]);
                }
@@ -333,11 +344,12 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                return string.toString();
        }
 
-       protected int skipFields(byte[] bytes, int startPos, int limit, char 
delim) {
+       protected int skipFields(byte[] bytes, int startPos, int limit, byte[] 
delim) {
+
                int i = startPos;
                
-               final byte delByte = (byte) delim;
                byte current;
+               final int delimLimit = limit - delim.length + 1;
                
                // skip over initial whitespace lines
                while (i < limit && ((current = bytes[i]) == ' ' || current == 
'\t')) {
@@ -345,11 +357,11 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                }
                
                // first none whitespace character
-               if (i < limit && bytes[i] == '"') {
+               if (i < limit && bytes[i] == QUOTE_CHARACTER) {
                        // quoted string
                        i++; // the quote
                        
-                       while (i < limit && bytes[i] != '"') {
+                       while (i < limit && bytes[i] != QUOTE_CHARACTER) {
                                i++;
                        }
                        
@@ -358,16 +370,16 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                                i++; // the quote
                                
                                // skip trailing whitespace characters 
-                               while (i < limit && (current = bytes[i]) != 
delByte) {
+                               while (i < delimLimit && 
!FieldParser.delimiterNext(bytes, i, delim)) {
+                                       current = bytes[i];
                                        if (current == ' ' || current == '\t') {
                                                i++;
-                                       }
-                                       else {
+                                       } else {
                                                return -1;      // illegal case 
of non-whitespace characters trailing
                                        }
                                }
                                
-                               return (i == limit ? limit : i+1);
+                               return (i >= delimLimit ? limit : i + 
delim.length);
                        } else {
                                // exited due to line end without quote 
termination
                                return -1;
@@ -375,10 +387,10 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                }
                else {
                        // unquoted field
-                       while (i < limit && bytes[i] != delByte) {
+                       while (i < delimLimit && 
!FieldParser.delimiterNext(bytes, i, delim)) {
                                i++;
                        }
-                       return (i == limit ? limit : i+1);
+                       return (i >= delimLimit ? limit : i + delim.length);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
index 4be4d7f..5858da2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
@@ -25,25 +25,27 @@ public class ByteParser extends FieldParser<Byte> {
        private byte result;
        
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, char 
delimiter, Byte reusable) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Byte reusable) {
                int val = 0;
                boolean neg = false;
+
+               final int delimLimit = limit-delimiter.length+1;
                
                if (bytes[startPos] == '-') {
                        neg = true;
                        startPos++;
                        
                        // check for empty field with only the sign
-                       if (startPos == limit || bytes[startPos] == delimiter) {
+                       if (startPos == limit || (startPos < delimLimit && 
delimiterNext(bytes, startPos, delimiter))) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
                                return -1;
                        }
                }
-               
+
                for (int i = startPos; i < limit; i++) {
-                       if (bytes[i] == delimiter) {
+                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                this.result = (byte) (neg ? -val : val);
-                               return i+1;
+                               return i + delimiter.length;
                        }
                        if (bytes[i] < 48 || bytes[i] > 57) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
index 85fefb6..f9b36e4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
@@ -30,27 +30,30 @@ public class ByteValueParser extends FieldParser<ByteValue> 
{
        private ByteValue result;
        
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, char 
delimiter, ByteValue reusable) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, ByteValue reusable) {
                int val = 0;
                boolean neg = false;
                
                this.result = reusable;
+
+               final int delimLimit = limit-delimiter.length+1;
                
                if (bytes[startPos] == '-') {
                        neg = true;
                        startPos++;
                        
                        // check for empty field with only the sign
-                       if (startPos == limit || bytes[startPos] == delimiter) {
+                       if (startPos == limit || (startPos < delimLimit && 
delimiterNext(bytes, startPos, delimiter))) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
                                return -1;
                        }
                }
-               
+
                for (int i = startPos; i < limit; i++) {
-                       if (bytes[i] == delimiter) {
+
+                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                reusable.setValue((byte) (neg ? -val : val));
-                               return i+1;
+                               return i + delimiter.length;
                        }
                        if (bytes[i] < 48 || bytes[i] > 57) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
index df768e3..947fdfe 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
@@ -29,18 +29,22 @@ public class DoubleParser extends FieldParser<Double> {
        private double result;
        
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, char 
delimiter, Double reusable) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Double reusable) {
                int i = startPos;
-               final byte delByte = (byte) delimiter;
+
+               final int delimLimit = limit-delimiter.length+1;
                
-               while (i < limit && bytes[i] != delByte) {
+               while (i < limit) {
+                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               break;
+                       }
                        i++;
                }
                
                String str = new String(bytes, startPos, i-startPos);
                try {
                        this.result = Double.parseDouble(str);
-                       return (i == limit) ? limit : i+1;
+                       return (i == limit) ? limit : i + delimiter.length;
                }
                catch (NumberFormatException e) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
index 2f12a0e..e225c1f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
@@ -29,12 +29,16 @@ public class DoubleValueParser extends 
FieldParser<DoubleValue> {
        private DoubleValue result;
        
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, char 
delim, DoubleValue reusable) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, DoubleValue reusable) {
                
                int i = startPos;
-               final byte delByte = (byte) delim;
-               
-               while (i < limit && bytes[i] != delByte) {
+
+               final int delimLimit = limit-delimiter.length+1;
+
+               while (i < limit) {
+                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               break;
+                       }
                        i++;
                }
                
@@ -43,7 +47,7 @@ public class DoubleValueParser extends 
FieldParser<DoubleValue> {
                        double value = Double.parseDouble(str);
                        reusable.setValue(value);
                        this.result = reusable;
-                       return (i == limit) ? limit : i+1;
+                       return (i == limit) ? limit : i + delimiter.length;
                }
                catch (NumberFormatException e) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index 44a6014..322f48e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -78,16 +78,16 @@ public abstract class FieldParser<T> {
         * @param startPos The index where the field starts
         * @param limit The limit unto which the byte contents is valid for the 
parser. The limit is the
         *              position one after the last valid byte.
-        * @param delim Field delimiter character
-        * @param reuse The an optional reusable field to hold the value
+        * @param delim The field delimiter character
+        * @param reuse An optional reusable field to hold the value
         * 
         * @return The index of the next delimiter, if the field was parsed 
correctly. A value less than 0 otherwise.
         */
-       public abstract int parseField(byte[] bytes, int startPos, int limit, 
char delim, T reuse);
+       public abstract int parseField(byte[] bytes, int startPos, int limit, 
byte[] delim, T reuse);
        
        /**
         * Gets the parsed field. This method returns the value parsed by the 
last successful invocation of
-        * {@link #parseField(byte[], int, int, char, Object)}. It objects are 
mutable and reused, it will return
+        * {@link #parseField(byte[], int, int, byte[], Object)}. It objects 
are mutable and reused, it will return
         * the object instance that was passed the the parse function.
         * 
         * @return The latest parsed field.
@@ -102,6 +102,29 @@ public abstract class FieldParser<T> {
        public abstract T createValue();
        
        /**
+        * Checks if the delimiter starts at the given start position of the 
byte array.
+        * 
+        * Attention: This method assumes that enough characters follow the 
start position for the delimiter check!
+        * 
+        * @param bytes The byte array that holds the value.
+        * @param startPos The index of the byte array where the check for the 
delimiter starts.
+        * @param delim The delimiter to check for.
+        * 
+        * @return true if a delimiter starts at the given start position, 
false otherwise.
+        */
+       public static final boolean delimiterNext(byte[] bytes, int startPos, 
byte[] delim) {
+
+               for(int pos = 0; pos < delim.length; pos++) {
+                       // check each position
+                       if(delim[pos] != bytes[startPos+pos]) {
+                               return false;
+                       }
+               }
+               return true;
+               
+       }
+       
+       /**
         * Sets the error state of the parser. Called by subclasses of the 
parser to set the type of error
         * when failing a parse.
         * 
@@ -155,7 +178,7 @@ public abstract class FieldParser<T> {
                PARSERS.put(String.class, StringParser.class);
                PARSERS.put(Float.class, FloatParser.class);
                PARSERS.put(Double.class, DoubleParser.class);
-               
+
                // value types
                PARSERS.put(ByteValue.class, ByteValueParser.class);
                PARSERS.put(ShortValue.class, ShortValueParser.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
index 6d85d82..7d166c7 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
@@ -27,19 +27,23 @@ public class FloatParser extends FieldParser<Float> {
        private float result;
        
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, char 
delim, Float reusable) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Float reusable) {
                
                int i = startPos;
-               final byte delByte = (byte) delim;
-               
-               while (i < limit && bytes[i] != delByte) {
+
+               final int delimLimit = limit-delimiter.length+1;
+
+               while (i < limit) {
+                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               break;
+                       }
                        i++;
                }
                
                String str = new String(bytes, startPos, i-startPos);
                try {
                        this.result = Float.parseFloat(str);
-                       return (i == limit) ? limit : i+1;
+                       return (i == limit) ? limit : i+ delimiter.length;
                }
                catch (NumberFormatException e) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
index a682301..af16d4c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
@@ -29,12 +29,16 @@ public class FloatValueParser extends 
FieldParser<FloatValue> {
        private FloatValue result;
        
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, char 
delim, FloatValue reusable) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, FloatValue reusable) {
                
                int i = startPos;
-               final byte delByte = (byte) delim;
-               
-               while (i < limit && bytes[i] != delByte) {
+
+               final int delimLimit = limit-delimiter.length+1;
+
+               while (i < limit) {
+                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               break;
+                       }
                        i++;
                }
                
@@ -43,7 +47,7 @@ public class FloatValueParser extends FieldParser<FloatValue> 
{
                        float value = Float.parseFloat(str);
                        reusable.setValue(value);
                        this.result = reusable;
-                       return (i == limit) ? limit : i+1;
+                       return (i == limit) ? limit : i + delimiter.length;
                }
                catch (NumberFormatException e) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
index a5d8c06..c871f4a 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
@@ -32,25 +32,27 @@ public class IntParser extends FieldParser<Integer> {
        private int result;
        
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, char 
delimiter, Integer reusable) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Integer reusable) {
                long val = 0;
                boolean neg = false;
-               
+
+               final int delimLimit = limit-delimiter.length+1;
+
                if (bytes[startPos] == '-') {
                        neg = true;
                        startPos++;
                        
                        // check for empty field with only the sign
-                       if (startPos == limit || bytes[startPos] == delimiter) {
+                       if (startPos == limit || ( startPos < delimLimit && 
delimiterNext(bytes, startPos, delimiter))) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
                                return -1;
                        }
                }
                
                for (int i = startPos; i < limit; i++) {
-                       if (bytes[i] == delimiter) {
+                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                this.result = (int) (neg ? -val : val);
-                               return i+1;
+                               return i + delimiter.length;
                        }
                        if (bytes[i] < 48 || bytes[i] > 57) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
@@ -114,7 +116,7 @@ public class IntParser extends FieldParser<Integer> {
                }
                long val = 0;
                boolean neg = false;
-               
+
                if (bytes[startPos] == '-') {
                        neg = true;
                        startPos++;

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
index 42f618c..8cb8176 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
@@ -33,27 +33,29 @@ public class IntValueParser extends FieldParser<IntValue> {
        private IntValue result;
        
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, char 
delimiter, IntValue reusable) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, IntValue reusable) {
                long val = 0;
                boolean neg = false;
-               
+
+               final int delimLimit = limit-delimiter.length+1;
+
                this.result = reusable;
-               
+
                if (bytes[startPos] == '-') {
                        neg = true;
                        startPos++;
                        
                        // check for empty field with only the sign
-                       if (startPos == limit || bytes[startPos] == delimiter) {
+                       if (startPos == limit || (startPos < delimLimit && 
delimiterNext(bytes, startPos, delimiter))) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
                                return -1;
                        }
                }
                
                for (int i = startPos; i < limit; i++) {
-                       if (bytes[i] == delimiter) {
+                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                reusable.setValue((int) (neg ? -val : val));
-                               return i+1;
+                               return i + delimiter.length;
                        }
                        if (bytes[i] < 48 || bytes[i] > 57) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
index 830fb3b..af17f15 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
@@ -28,25 +28,27 @@ public class LongParser extends FieldParser<Long> {
        private long result;
        
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, char 
delimiter, Long reusable) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Long reusable) {
                long val = 0;
                boolean neg = false;
+
+               final int delimLimit = limit - delimiter.length + 1;
                
                if (bytes[startPos] == '-') {
                        neg = true;
                        startPos++;
                        
                        // check for empty field with only the sign
-                       if (startPos == limit || bytes[startPos] == delimiter) {
+                       if (startPos == limit || (startPos < delimLimit && 
delimiterNext(bytes, startPos, delimiter))) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
                                return -1;
                        }
                }
                
                for (int i = startPos; i < limit; i++) {
-                       if (bytes[i] == delimiter) {
+                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                this.result = neg ? -val : val;
-                               return i+1;
+                               return i + delimiter.length;
                        }
                        if (bytes[i] < 48 || bytes[i] > 57) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
@@ -63,8 +65,8 @@ public class LongParser extends FieldParser<Long> {
                                        
                                        if (i+1 >= limit) {
                                                return limit; 
-                                       } else if (bytes[i+1] == delimiter) {
-                                               return i+2;
+                                       } else if (i+1 < delimLimit && 
delimiterNext(bytes, i+1, delimiter)) {
+                                               return i + 1 + delimiter.length;
                                        } else {
                                                
setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW);
                                                return -1;

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
index 321cbcb..8b697cc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
@@ -30,10 +30,12 @@ public class LongValueParser extends FieldParser<LongValue> 
{
        private LongValue result;
        
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, char 
delimiter, LongValue reusable) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, LongValue reusable) {
                long val = 0;
                boolean neg = false;
-               
+
+               final int delimLimit = limit-delimiter.length+1;
+
                this.result = reusable;
                
                if (bytes[startPos] == '-') {
@@ -41,16 +43,16 @@ public class LongValueParser extends FieldParser<LongValue> 
{
                        startPos++;
                        
                        // check for empty field with only the sign
-                       if (startPos == limit || bytes[startPos] == delimiter) {
+                       if (startPos == limit || (startPos < delimLimit && 
delimiterNext(bytes, startPos, delimiter))) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
                                return -1;
                        }
                }
                
                for (int i = startPos; i < limit; i++) {
-                       if (bytes[i] == delimiter) {
+                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                reusable.setValue(neg ? -val : val);
-                               return i+1;
+                               return i + delimiter.length;
                        }
                        if (bytes[i] < 48 || bytes[i] > 57) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
@@ -58,7 +60,7 @@ public class LongValueParser extends FieldParser<LongValue> {
                        }
                        val *= 10;
                        val += bytes[i] - 48;
-                       
+
                        // check for overflow / underflow
                        if (val < 0) {
                                // this is an overflow/underflow, unless we hit 
exactly the Long.MIN_VALUE
@@ -66,9 +68,9 @@ public class LongValueParser extends FieldParser<LongValue> {
                                        reusable.setValue(Long.MIN_VALUE);
                                        
                                        if (i+1 >= limit) {
-                                               return limit; 
-                                       } else if (bytes[i+1] == delimiter) {
-                                               return i+2;
+                                               return limit;
+                                       } else if (i+1 < delimLimit && 
delimiterNext(bytes, i+1, delimiter)) {
+                                               return i + 1 + delimiter.length;
                                        } else {
                                                
setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW);
                                                return -1;
@@ -80,7 +82,7 @@ public class LongValueParser extends FieldParser<LongValue> {
                                }
                        }
                }
-               
+
                reusable.setValue(neg ? -val : val);
                return limit;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
index 0d431df..a6f9898 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
@@ -32,25 +32,27 @@ public class ShortParser extends FieldParser<Short> {
        private short result;
 
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, char 
delimiter, Short reusable) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Short reusable) {
                int val = 0;
                boolean neg = false;
+
+               final int delimLimit = limit-delimiter.length+1;
                
                if (bytes[startPos] == '-') {
                        neg = true;
                        startPos++;
                        
                        // check for empty field with only the sign
-                       if (startPos == limit || bytes[startPos] == delimiter) {
+                       if (startPos == limit || (startPos < delimLimit && 
delimiterNext(bytes, startPos, delimiter))) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
                                return -1;
                        }
                }
                
                for (int i = startPos; i < limit; i++) {
-                       if (bytes[i] == delimiter) {
+                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                this.result = (short) (neg ? -val : val);
-                               return i+1;
+                               return i + delimiter.length;
                        }
                        if (bytes[i] < 48 || bytes[i] > 57) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
index 82272a6..f5168cc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
@@ -33,9 +33,11 @@ public class ShortValueParser extends 
FieldParser<ShortValue> {
        private ShortValue result;
 
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, char 
delimiter, ShortValue reusable) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, ShortValue reusable) {
                int val = 0;
                boolean neg = false;
+
+               final int delimLimit = limit-delimiter.length+1;
                
                this.result = reusable;
                
@@ -44,16 +46,16 @@ public class ShortValueParser extends 
FieldParser<ShortValue> {
                        startPos++;
                        
                        // check for empty field with only the sign
-                       if (startPos == limit || bytes[startPos] == delimiter) {
+                       if (startPos == limit || (startPos < delimLimit && 
delimiterNext(bytes, startPos, delimiter))) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
                                return -1;
                        }
                }
                
                for (int i = startPos; i < limit; i++) {
-                       if (bytes[i] == delimiter) {
+                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                reusable.setValue((short) (neg ? -val : val));
-                               return i+1;
+                               return i + delimiter.length;
                        }
                        if (bytes[i] < 48 || bytes[i] > 57) {
                                
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
index a39dbe0..bd2550e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
@@ -28,8 +28,8 @@ public class StringParser extends FieldParser<String> {
        
        private static final byte WHITESPACE_SPACE = (byte) ' ';
        private static final byte WHITESPACE_TAB = (byte) '\t';
-       
-       private static final byte QUOTE_DOUBLE = (byte) '"';
+
+       private static final byte QUOTE_CHARACTER = (byte) '"';
 
        private static enum ParserStates {
                NONE, IN_QUOTE, STOP
@@ -38,12 +38,13 @@ public class StringParser extends FieldParser<String> {
        private String result;
        
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, char 
delim, String reusable) {
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, String reusable) {
                
                int i = startPos;
-               
-               final byte delByte = (byte) delim;
                byte current;
+               boolean delimiterFound = false;
+
+               final int delimLimit = limit-delimiter.length+1;
                
                // count initial whitespace lines
                while (i < limit && ((current = bytes[i]) == WHITESPACE_SPACE 
|| current == WHITESPACE_TAB)) {
@@ -62,11 +63,14 @@ public class StringParser extends FieldParser<String> {
                        if(endOfCellPosition == limit) {
                                break;
                        }
-                       current = bytes[endOfCellPosition];
-                       if(current == delByte) {
+                       if(endOfCellPosition < delimLimit && 
delimiterNext(bytes, endOfCellPosition, delimiter)) {
                                // if we are in a quote do nothing, otherwise 
we reached the end
-                               parserState = parserState == 
ParserStates.IN_QUOTE ? parserState: ParserStates.STOP;
-                       } else if(current == QUOTE_DOUBLE) {
+                               if (parserState != ParserStates.IN_QUOTE) {
+                                       parserState = ParserStates.STOP;
+                                       delimiterFound = true;
+                               }
+                               endOfCellPosition += delimiter.length - 1;
+                       } else if(bytes[endOfCellPosition] == QUOTE_CHARACTER) {
                                // we entered a quote
                                if(parserState == ParserStates.IN_QUOTE) {
                                        // we end the quote
@@ -77,6 +81,7 @@ public class StringParser extends FieldParser<String> {
                                }
                        }
                }
+               int delimCorrection = delimiterFound ? delimiter.length : 1;
 
                if(parserState == ParserStates.IN_QUOTE) {
                        // exited due to line end without quote termination
@@ -84,54 +89,51 @@ public class StringParser extends FieldParser<String> {
                        return -1;
                }
 
-
                // boundary of the cell is now
                // i --> endOfCellPosition
 
                // first none whitespace character
-               if (i < limit && bytes[i] == QUOTE_DOUBLE) {
+               if (i < limit && bytes[i] == QUOTE_CHARACTER) {
 
                        // check if there are characters at the end
-                       current = bytes[endOfCellPosition - 1];
+                       current = bytes[endOfCellPosition - delimCorrection];
 
                        // if the character preceding the end of the cell is 
not a WHITESPACE or the end QUOTE_DOUBLE
                        // there are unquoted characters at the end
 
-                       if (!(current == WHITESPACE_SPACE || current == 
WHITESPACE_TAB || current == QUOTE_DOUBLE)) {
+                       if (!(current == WHITESPACE_SPACE || current == 
WHITESPACE_TAB || current == QUOTE_CHARACTER)) {
                                
setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
                                return -1;      // illegal case of 
non-whitespace characters trailing
                        }
 
                        // skip trailing whitespace after quote .. by moving 
the cursor backwards
                        int skipAtEnd = 0;
-                       while (bytes[endOfCellPosition - 1 - skipAtEnd] == 
WHITESPACE_SPACE || bytes[endOfCellPosition - 1 - skipAtEnd] == WHITESPACE_TAB) 
{
+                       while (bytes[endOfCellPosition - delimCorrection - 
skipAtEnd] == WHITESPACE_SPACE ||
+                                       bytes[endOfCellPosition - 
delimCorrection - skipAtEnd] == WHITESPACE_TAB) {
                                skipAtEnd++;
                        }
 
                        // now unescape
                        boolean notEscaped = true;
                        int endOfContent = i + 1;
-                       for(int counter = endOfContent; counter < 
endOfCellPosition - skipAtEnd; counter++) {
-                               notEscaped = bytes[counter] != QUOTE_DOUBLE || 
!notEscaped;
+                       for(int counter = endOfContent; counter < 
endOfCellPosition - delimCorrection - skipAtEnd; counter++) {
+                               notEscaped = bytes[counter] != QUOTE_CHARACTER 
|| !notEscaped;
                                if (notEscaped) {
                                        // realign
                                        bytes[endOfContent++] = bytes[counter];
                                }
                        }
-
                        this.result = new String(bytes, i + 1, endOfContent - i 
- 1);
-
                        return (endOfCellPosition == limit ? limit : 
endOfCellPosition + 1);
                }
                else {
                        // unquoted string
-
                        // set from the beginning. unquoted strings include the 
leading whitespaces
-                       this.result = new String(bytes, i, endOfCellPosition - 
i);
+                       this.result = new String(bytes, i, endOfCellPosition - 
i - (delimCorrection - 1));
                        return (endOfCellPosition == limit ? limit : 
endOfCellPosition + 1);
                }
        }
-       
+
        @Override
        public String createValue() {
                return "";

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
index 8d0febf..7fd7b54 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
@@ -36,41 +36,45 @@ public class StringValueParser extends 
FieldParser<StringValue> {
        private static final byte QUOTE_DOUBLE = (byte) '"';
        
        private StringValue result;
-       
+
        @Override
-       public int parseField(byte[] bytes, int startPos, int length, char 
delim, StringValue reusable) {
-               
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, StringValue reusable) {
+
                this.result = reusable;
-               
                int i = startPos;
-               
-               final byte delByte = (byte) delim;
                byte current;
-               
+
+               final int delimLimit = limit-delimiter.length+1;
+
                // count initial whitespace lines
-               while (i < length && ((current = bytes[i]) == WHITESPACE_SPACE 
|| current == WHITESPACE_TAB)) {
+               while (i < limit && ((current = bytes[i]) == WHITESPACE_SPACE 
|| current == WHITESPACE_TAB)) {
                        i++;
                }
                
                // first none whitespace character
-               if (i < length && bytes[i] == QUOTE_DOUBLE) {
+               if (i < limit && bytes[i] == QUOTE_DOUBLE) {
                        // quoted string
                        i++; // the quote
                        
                        // we count only from after the quote
                        int quoteStart = i;
-                       while (i < length && bytes[i] != QUOTE_DOUBLE) {
+                       while (i < limit && bytes[i] != QUOTE_DOUBLE) {
                                i++;
                        }
                        
-                       if (i < length) {
+                       if (i < limit) {
                                // end of the string
                                reusable.setValueAscii(bytes, quoteStart, 
i-quoteStart);
                                
                                i++; // the quote
                                
-                               // skip trailing whitespace characters 
-                               while (i < length && (current = bytes[i]) != 
delByte) {
+                               // skip trailing whitespace characters
+                               while (i < limit) {
+
+                                       if (i < delimLimit && 
delimiterNext(bytes, i, delimiter)) {
+                                               return i+delimiter.length;
+                                       }
+                                       current = bytes[i];
                                        if (current == WHITESPACE_SPACE || 
current == WHITESPACE_TAB) {
                                                i++;
                                        }
@@ -79,8 +83,10 @@ public class StringValueParser extends 
FieldParser<StringValue> {
                                                return -1;      // illegal case 
of non-whitespace characters trailing
                                        }
                                }
-                               
-                               return (i == length ? length : i+1);
+                               if( i > limit ){
+                                       i--;
+                               }
+                               return (i == limit ? limit : i + 
delimiter.length);
                        } else {
                                // exited due to line end without quote 
termination
                                
setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING);
@@ -88,14 +94,16 @@ public class StringValueParser extends 
FieldParser<StringValue> {
                        }
                }
                else {
-                       // unquoted string
-                       while (i < length && bytes[i] != delByte) {
+                       // unquoted string -delim.length
+                       while (i < limit) {
+                               if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                                       break;
+                               }
                                i++;
                        }
-                       
                        // set from the beginning. unquoted strings include the 
leading whitespaces
                        reusable.setValueAscii(bytes, startPos, i-startPos);
-                       return (i == length ? length : i+1);
+                       return (i == limit ? limit : i + delimiter.length);
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
index 4d93070..3749645 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
@@ -88,7 +88,7 @@ public class GenericCsvInputFormatTest {
                
                        final Configuration parameters = new Configuration();
                        
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        format.setFieldTypesGeneric(IntValue.class, 
IntValue.class, IntValue.class, IntValue.class, IntValue.class);
                        
                        format.configure(parameters);
@@ -128,7 +128,7 @@ public class GenericCsvInputFormatTest {
                
                        final Configuration parameters = new Configuration();
                        
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        format.setFieldTypesGeneric(IntValue.class, 
IntValue.class);
                        
                        format.configure(parameters);
@@ -160,12 +160,14 @@ public class GenericCsvInputFormatTest {
        @Test
        public void testSparseParse() {
                try {
-                       final String fileContent = 
"111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
+                       final String fileContent =
+                                       
"111|222|333|444|555|666|777|888|999|000|\n"+
+                                       
"000|999|888|777|666|555|444|333|222|111|";
                        final FileInputSplit split = 
createTempFile(fileContent);       
                
                        final Configuration parameters = new Configuration();
                        
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        format.setFieldTypesGeneric(IntValue.class, null, null, 
IntValue.class, null, null, null, IntValue.class);
                        
                        format.configure(parameters);
@@ -203,7 +205,7 @@ public class GenericCsvInputFormatTest {
 
                        final Configuration parameters = new Configuration();
 
-                       format.setFieldDelimiter(',');
+                       format.setFieldDelimiter(",");
                        format.setFieldTypesGeneric(LongValue.class, 
LongValue.class, LongValue.class);
                        format.configure(parameters);
                        format.open(split);
@@ -241,7 +243,7 @@ public class GenericCsvInputFormatTest {
 
                        final Configuration parameters = new Configuration();
 
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        format.setFieldsGeneric(new int[] { 0, 3, 7 },
                                (Class<? extends Value>[]) new Class[] { 
IntValue.class, IntValue.class, IntValue.class });
                        format.configure(parameters);
@@ -269,7 +271,62 @@ public class GenericCsvInputFormatTest {
                        fail("Test erroneous");
                }
        }
-       
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testSparseParseWithIndicesMultiCharDelimiter() {
+               try {
+                       final String fileContent =
+                                       
"111|-|222|-|333|-|444|-|555|-|666|-|777|-|888|-|999|-|000|-|\n"+
+                                       
"000|-|999|-|888|-|777|-|666|-|555|-|444|-|333|-|222|-|111\n"+
+                                       
"555|-|999|-|888|-|111|-|666|-|555|-|444|-|777|-|222|-|111|-|\n"+
+                                       
"22222|-|99999|-|8|-|99999999|-|6666666|-|5|-|4444|-|8|-|22222|-|1\n";
+
+                       final FileInputSplit split = 
createTempFile(fileContent);
+
+                       final Configuration parameters = new Configuration();
+
+                       format.setFieldDelimiter("|-|");
+                       format.setFieldsGeneric(new int[] { 0, 3, 7 },
+                                       (Class<? extends Value>[]) new Class[] 
{ IntValue.class, IntValue.class, IntValue.class });
+                       format.configure(parameters);
+                       format.open(split);
+
+                       Value[] values = createIntValues(3);
+
+                       values = format.nextRecord(values);
+                       assertNotNull(values);
+                       assertEquals(111, ((IntValue) values[0]).getValue());
+                       assertEquals(444, ((IntValue) values[1]).getValue());
+                       assertEquals(888, ((IntValue) values[2]).getValue());
+
+                       values = format.nextRecord(values);
+                       assertNotNull(values);
+                       assertEquals(000, ((IntValue) values[0]).getValue());
+                       assertEquals(777, ((IntValue) values[1]).getValue());
+                       assertEquals(333, ((IntValue) values[2]).getValue());
+
+                       values = format.nextRecord(values);
+                       assertNotNull(values);
+                       assertEquals(555, ((IntValue) values[0]).getValue());
+                       assertEquals(111, ((IntValue) values[1]).getValue());
+                       assertEquals(777, ((IntValue) values[2]).getValue());
+
+                       values = format.nextRecord(values);
+                       assertNotNull(values);
+                       assertEquals(22222, ((IntValue) values[0]).getValue());
+                       assertEquals(99999999, ((IntValue) 
values[1]).getValue());
+                       assertEquals(8, ((IntValue) values[2]).getValue());
+
+                       assertNull(format.nextRecord(values));
+                       assertTrue(format.reachedEnd());
+               } catch (Exception ex) {
+                       System.err.println(ex.getMessage());
+                       ex.printStackTrace();
+                       fail("Test erroneous");
+               }
+       }
+
        @Test
        public void testReadTooShortInput() throws IOException {
                try {
@@ -277,7 +334,7 @@ public class GenericCsvInputFormatTest {
                        final FileInputSplit split = 
createTempFile(fileContent);       
                
                        final Configuration parameters = new Configuration();
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        format.setFieldTypesGeneric(IntValue.class, 
IntValue.class, IntValue.class, IntValue.class, IntValue.class);
                        
                        format.configure(parameters);
@@ -305,7 +362,7 @@ public class GenericCsvInputFormatTest {
                        final FileInputSplit split = 
createTempFile(fileContent);       
                
                        final Configuration parameters = new Configuration();
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        format.setFieldTypesGeneric(IntValue.class, 
IntValue.class, IntValue.class, IntValue.class, IntValue.class);
                        format.setLenient(true);
                        
@@ -331,7 +388,7 @@ public class GenericCsvInputFormatTest {
                
                        final Configuration parameters = new Configuration();
 
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        format.setFieldTypesGeneric(StringValue.class, 
IntValue.class, StringValue.class, IntValue.class);
                        
                        format.configure(parameters);
@@ -362,7 +419,7 @@ public class GenericCsvInputFormatTest {
                
                        final Configuration parameters = new Configuration();
 
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        format.setFieldTypesGeneric(StringValue.class, 
IntValue.class, StringValue.class, IntValue.class);
                        format.setLenient(true);
                        
@@ -390,7 +447,7 @@ public class GenericCsvInputFormatTest {
                
                        final Configuration parameters = new Configuration();
 
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        format.setFieldTypesGeneric(StringValue.class, null, 
IntValue.class);
                        format.setLenient(true);
                        
@@ -417,7 +474,7 @@ public class GenericCsvInputFormatTest {
                
                        final Configuration parameters = new Configuration();
 
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        format.setFieldTypesGeneric(StringValue.class, 
StringValue.class, StringValue.class);
                        
                        format.configure(parameters);
@@ -460,7 +517,7 @@ public class GenericCsvInputFormatTest {
                
                        final Configuration parameters = new Configuration();
 
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        format.setFieldTypesGeneric(IntValue.class, 
StringValue.class, IntValue.class, StringValue.class);
                        format.setSkipFirstLineAsHeader(true);
                        
@@ -492,7 +549,7 @@ public class GenericCsvInputFormatTest {
                
                        final Configuration parameters = new Configuration();
 
-                       format.setFieldDelimiter('|');
+                       format.setFieldDelimiter("|");
                        format.setFieldTypesGeneric(IntValue.class, 
StringValue.class, IntValue.class, StringValue.class);
                        format.setSkipFirstLineAsHeader(true);
                        
@@ -516,7 +573,6 @@ public class GenericCsvInputFormatTest {
                        fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
                }
        }
-       
 
        private FileInputSplit createTempFile(String content) throws 
IOException {
                this.tempFile = File.createTempFile("test_contents", "tmp");
@@ -558,4 +614,5 @@ public class GenericCsvInputFormatTest {
                        return parseRecord(target, bytes, offset, numBytes) ? 
target : null;
                }
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
index ff7f473..37d6903 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
@@ -28,14 +28,14 @@ public class ByteParserTest extends ParserTestBase<Byte> {
        @Override
        public String[] getValidTestValues() {
                return new String[] {
-                       "0", "1", "76", "-66", String.valueOf(Byte.MAX_VALUE), 
String.valueOf(Byte.MIN_VALUE)
+                       "0", "1", "76", "-66", String.valueOf(Byte.MAX_VALUE), 
String.valueOf(Byte.MIN_VALUE), "19"
                };
        }
        
        @Override
        public Byte[] getValidTestResults() {
                return new Byte[] {
-                       (byte) 0, (byte)1, (byte)76, (byte) -66, 
Byte.MAX_VALUE, Byte.MIN_VALUE
+                       (byte) 0, (byte)1, (byte)76, (byte) -66, 
Byte.MAX_VALUE, Byte.MIN_VALUE, (byte)19
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
index 3589f2a..a6c315a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
@@ -29,7 +29,7 @@ public class ByteValueParserTest extends 
ParserTestBase<ByteValue> {
        @Override
        public String[] getValidTestValues() {
                return new String[] {
-                       "0", "1", "76", "-66", String.valueOf(Byte.MAX_VALUE), 
String.valueOf(Byte.MIN_VALUE)
+                       "0", "1", "76", "-66", String.valueOf(Byte.MAX_VALUE), 
String.valueOf(Byte.MIN_VALUE), "19"
                };
        }
        
@@ -37,7 +37,7 @@ public class ByteValueParserTest extends 
ParserTestBase<ByteValue> {
        public ByteValue[] getValidTestResults() {
                return new ByteValue[] {
                        new ByteValue((byte) 0), new ByteValue((byte) 1), new 
ByteValue((byte) 76), new ByteValue((byte) -66),
-                       new ByteValue(Byte.MAX_VALUE), new 
ByteValue(Byte.MIN_VALUE)
+                       new ByteValue(Byte.MAX_VALUE), new 
ByteValue(Byte.MIN_VALUE), new ByteValue((byte) 19)
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
index 92f6b21..71e78a0 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
@@ -32,7 +32,7 @@ public class DoubleParserTest extends ParserTestBase<Double> {
                        String.valueOf(Double.MAX_VALUE), 
String.valueOf(Double.MIN_VALUE),
                        String.valueOf(Double.NEGATIVE_INFINITY), 
String.valueOf(Double.POSITIVE_INFINITY),
                        String.valueOf(Double.NaN),
-                       "1.234E2", "1.234e3", "1.234E-2"
+                       "1.234E2", "1.234e3", "1.234E-2", "1239"
                };
        }
        
@@ -43,7 +43,7 @@ public class DoubleParserTest extends ParserTestBase<Double> {
                        Double.MAX_VALUE, Double.MIN_VALUE,
                        Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY,
                        Double.NaN,
-                       1.234E2, 1.234e3, 1.234E-2
+                       1.234E2, 1.234e3, 1.234E-2, 1239d
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
index 21a3e03..120dfac 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
@@ -33,7 +33,7 @@ public class DoubleValueParserTest extends 
ParserTestBase<DoubleValue> {
                        String.valueOf(Double.MAX_VALUE), 
String.valueOf(Double.MIN_VALUE),
                        String.valueOf(Double.NEGATIVE_INFINITY), 
String.valueOf(Double.POSITIVE_INFINITY),
                        String.valueOf(Double.NaN),
-                       "1.234E2", "1.234e3", "1.234E-2"
+                       "1.234E2", "1.234e3", "1.234E-2", "1239"
                };
        }
        
@@ -45,7 +45,7 @@ public class DoubleValueParserTest extends 
ParserTestBase<DoubleValue> {
                        new DoubleValue(Double.MAX_VALUE), new 
DoubleValue(Double.MIN_VALUE),
                        new DoubleValue(Double.NEGATIVE_INFINITY), new 
DoubleValue(Double.POSITIVE_INFINITY),
                        new DoubleValue(Double.NaN),
-                       new DoubleValue(1.234E2), new DoubleValue(1.234e3), new 
DoubleValue(1.234E-2)
+                       new DoubleValue(1.234E2), new DoubleValue(1.234e3), new 
DoubleValue(1.234E-2), new DoubleValue(1239d)
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
index 888efca..3c450a5 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
@@ -32,7 +32,7 @@ public class FloatParserTest extends ParserTestBase<Float> {
                        String.valueOf(Float.MAX_VALUE), 
String.valueOf(Float.MIN_VALUE),
                        String.valueOf(Float.NEGATIVE_INFINITY), 
String.valueOf(Float.POSITIVE_INFINITY),
                        String.valueOf(Float.NaN),
-                       "1.234E2", "1.234e3", "1.234E-2"
+                       "1.234E2", "1.234e3", "1.234E-2", "1239"
                };
        }
        
@@ -43,7 +43,7 @@ public class FloatParserTest extends ParserTestBase<Float> {
                        Float.MAX_VALUE, Float.MIN_VALUE,
                        Float.NEGATIVE_INFINITY, Float.POSITIVE_INFINITY,
                        Float.NaN,
-                       1.234E2f, 1.234e3f, 1.234E-2f
+                       1.234E2f, 1.234e3f, 1.234E-2f, 1239f
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
index 06b4194..be5b5b8 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
@@ -33,7 +33,7 @@ public class FloatValueParserTest extends 
ParserTestBase<FloatValue> {
                        String.valueOf(Float.MAX_VALUE), 
String.valueOf(Float.MIN_VALUE),
                        String.valueOf(Float.NEGATIVE_INFINITY), 
String.valueOf(Float.POSITIVE_INFINITY),
                        String.valueOf(Float.NaN),
-                       "1.234E2", "1.234e3", "1.234E-2"
+                       "1.234E2", "1.234e3", "1.234E-2", "1239"
                };
        }
        
@@ -45,7 +45,7 @@ public class FloatValueParserTest extends 
ParserTestBase<FloatValue> {
                        new FloatValue(Float.MAX_VALUE), new 
FloatValue(Float.MIN_VALUE),
                        new FloatValue(Float.NEGATIVE_INFINITY), new 
FloatValue(Float.POSITIVE_INFINITY),
                        new FloatValue(Float.NaN),
-                       new FloatValue(1.234E2f), new FloatValue(1.234e3f), new 
FloatValue(1.234E-2f)
+                       new FloatValue(1.234E2f), new FloatValue(1.234e3f), new 
FloatValue(1.234E-2f), new FloatValue(1239f)
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
index 828eb71..6e1d4db 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
@@ -28,14 +28,14 @@ public class IntParserTest extends ParserTestBase<Integer> {
        @Override
        public String[] getValidTestValues() {
                return new String[] {
-                       "0", "1", "576", "-877678", 
String.valueOf(Integer.MAX_VALUE), String.valueOf(Integer.MIN_VALUE)
+                       "0", "1", "576", "-877678", 
String.valueOf(Integer.MAX_VALUE), String.valueOf(Integer.MIN_VALUE), "1239"
                };
        }
        
        @Override
        public Integer[] getValidTestResults() {
                return new Integer[] {
-                       0, 1, 576, -877678, Integer.MAX_VALUE, Integer.MIN_VALUE
+                       0, 1, 576, -877678, Integer.MAX_VALUE, 
Integer.MIN_VALUE, 1239
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
index 63aabee..e32f704 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
@@ -29,7 +29,7 @@ public class IntValueParserTest extends 
ParserTestBase<IntValue> {
        @Override
        public String[] getValidTestValues() {
                return new String[] {
-                       "0", "1", "576", "-877678", 
String.valueOf(Integer.MAX_VALUE), String.valueOf(Integer.MIN_VALUE)
+                       "0", "1", "576", "-877678", 
String.valueOf(Integer.MAX_VALUE), String.valueOf(Integer.MIN_VALUE), "1239"
                };
        }
        
@@ -37,7 +37,7 @@ public class IntValueParserTest extends 
ParserTestBase<IntValue> {
        public IntValue[] getValidTestResults() {
                return new IntValue[] {
                        new IntValue(0), new IntValue(1), new IntValue(576), 
new IntValue(-877678),
-                       new IntValue(Integer.MAX_VALUE), new 
IntValue(Integer.MIN_VALUE)
+                       new IntValue(Integer.MAX_VALUE), new 
IntValue(Integer.MIN_VALUE), new IntValue(1239)
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
index da78d89..4dd116b 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
@@ -29,14 +29,15 @@ public class LongParserTest extends ParserTestBase<Long> {
        public String[] getValidTestValues() {
                return new String[] {
                        "0", "1", "576", "-877678", 
String.valueOf(Integer.MAX_VALUE), String.valueOf(Integer.MIN_VALUE),
-                       String.valueOf(Long.MAX_VALUE), 
String.valueOf(Long.MIN_VALUE), "7656"
+                       String.valueOf(Long.MAX_VALUE), 
String.valueOf(Long.MIN_VALUE), "7656", "1239"
                };
        }
        
        @Override
        public Long[] getValidTestResults() {
                return new Long[] {
-                       0L, 1L, 576L, -877678L, (long) Integer.MAX_VALUE, 
(long) Integer.MIN_VALUE, Long.MAX_VALUE, Long.MIN_VALUE, 7656L
+                       0L, 1L, 576L, -877678L, (long) Integer.MAX_VALUE, 
(long) Integer.MIN_VALUE, Long.MAX_VALUE, Long.MIN_VALUE,
+                       7656L, 1239L
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
index 2b44cc0..fac6f42 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
@@ -30,7 +30,7 @@ public class LongValueParserTest extends 
ParserTestBase<LongValue> {
        public String[] getValidTestValues() {
                return new String[] {
                        "0", "1", "576", "-877678", 
String.valueOf(Integer.MAX_VALUE), String.valueOf(Integer.MIN_VALUE),
-                       String.valueOf(Long.MAX_VALUE), 
String.valueOf(Long.MIN_VALUE), "7656"
+                       String.valueOf(Long.MAX_VALUE), 
String.valueOf(Long.MIN_VALUE), "7656", "1239"
                };
        }
        
@@ -39,7 +39,7 @@ public class LongValueParserTest extends 
ParserTestBase<LongValue> {
                return new LongValue[] {
                        new LongValue(0L), new LongValue(1L), new 
LongValue(576L), new LongValue(-877678L),
                        new LongValue((long) Integer.MAX_VALUE), new 
LongValue((long) Integer.MIN_VALUE),
-                       new LongValue(Long.MAX_VALUE), new 
LongValue(Long.MIN_VALUE), new LongValue(7656L)
+                       new LongValue(Long.MAX_VALUE), new 
LongValue(Long.MIN_VALUE), new LongValue(7656L), new LongValue(1239L)
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
index 00423d5..fb56add 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
@@ -79,18 +79,34 @@ public abstract class ParserTestBase<T> {
                        T[] results = getValidTestResults();
                        
                        for (int i = 0; i < testValues.length; i++) {
+
+                               FieldParser<T> parser1 = getParser();
+                               FieldParser<T> parser2 = getParser();
+                               FieldParser<T> parser3 = getParser();
                                
-                               FieldParser<T> parser = getParser();
-                               
-                               byte[] bytes = testValues[i].getBytes();
-                               int numRead = parser.parseField(bytes, 0, 
bytes.length, '|', parser.createValue());
-                               
-                               assertTrue("Parser declared the valid value " + 
testValues[i] + " as invalid.", numRead != -1);
-                               assertEquals("Invalid number of bytes read 
returned.", bytes.length, numRead);
-                               
-                               T result = parser.getLastResult();
-                               
-                               assertEquals("Parser parsed wrong.", 
results[i], result);
+                               byte[] bytes1 = testValues[i].getBytes();
+                               byte[] bytes2 = testValues[i].getBytes();
+                               byte[] bytes3 = testValues[i].getBytes();
+
+                               int numRead1 = parser1.parseField(bytes1, 0, 
bytes1.length, new byte[] {'|'}, parser1.createValue());
+                               int numRead2 = parser2.parseField(bytes2, 0, 
bytes2.length, new byte[] {'&', '&'}, parser2.createValue());
+                               int numRead3 = parser3.parseField(bytes3, 0, 
bytes3.length, new byte[] {'9','9','9'}, parser3.createValue());
+
+                               assertTrue("Parser declared the valid value " + 
testValues[i] + " as invalid.", numRead1 != -1);
+                               assertTrue("Parser declared the valid value " + 
testValues[i] + " as invalid.", numRead2 != -1);
+                               assertTrue("Parser declared the valid value " + 
testValues[i] + " as invalid.", numRead3 != -1);
+
+                               assertEquals("Invalid number of bytes read 
returned.", bytes1.length, numRead1);
+                               assertEquals("Invalid number of bytes read 
returned.", bytes2.length, numRead2);
+                               assertEquals("Invalid number of bytes read 
returned.", bytes3.length, numRead3);
+
+                               T result1 = parser1.getLastResult();
+                               T result2 = parser2.getLastResult();
+                               T result3 = parser3.getLastResult();
+
+                               assertEquals("Parser parsed wrong. 
"+testValues[i], results[i], result1);
+                               assertEquals("Parser parsed wrong. 
"+testValues[i], results[i], result2);
+                               assertEquals("Parser parsed wrong. 
"+testValues[i], results[i], result3);
                        }
                        
                }
@@ -100,35 +116,75 @@ public abstract class ParserTestBase<T> {
                        fail("Test erroneous: " + e.getMessage());
                }
        }
+
+       @Test
+       public void testValidStringInIsolationWithEndDelimiter() {
+               try {
+                       String[] testValues = getValidTestValues();
+                       T[] results = getValidTestResults();
+
+                       for (int i = 0; i < testValues.length; i++) {
+
+                               FieldParser<T> parser1 = getParser();
+                               FieldParser<T> parser2 = getParser();
+
+                               String testVal1 = testValues[i] + "|";
+                               String testVal2 = testValues[i] + "&&&&";
+
+                               byte[] bytes1 = testVal1.getBytes();
+                               byte[] bytes2 = testVal2.getBytes();
+
+                               int numRead1 = parser1.parseField(bytes1, 0, 
bytes1.length, new byte[] {'|'}, parser1.createValue());
+                               int numRead2 = parser2.parseField(bytes2, 0, 
bytes2.length, new byte[] {'&', '&','&', '&'}, parser2.createValue());
+
+                               assertTrue("Parser declared the valid value " + 
testValues[i] + " as invalid.", numRead1 != -1);
+                               assertTrue("Parser declared the valid value " + 
testValues[i] + " as invalid.", numRead2 != -1);
+
+                               assertEquals("Invalid number of bytes read 
returned.", bytes1.length, numRead1);
+                               assertEquals("Invalid number of bytes read 
returned.", bytes2.length, numRead2);
+
+                               T result1 = parser1.getLastResult();
+                               T result2 = parser2.getLastResult();
+
+                               assertEquals("Parser parsed wrong.", 
results[i], result1);
+                               assertEquals("Parser parsed wrong.", 
results[i], result2);
+                       }
+
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test erroneous: " + e.getMessage());
+               }
+       }
        
        @Test
        public void testConcatenated() {
                try {
                        String[] testValues = getValidTestValues();
                        T[] results = getValidTestResults();
-                       
-                       byte[] allBytesWithDelimiter = concatenate(testValues, 
'|', true);
-                       byte[] allBytesNoDelimiterEnd = concatenate(testValues, 
',', false);
-                       
+                       byte[] allBytesWithDelimiter = concatenate(testValues, 
new char[] {'|'}, true);
+                       byte[] allBytesNoDelimiterEnd = concatenate(testValues, 
new char[] {','}, false);
+
                        FieldParser<T> parser1 = getParser();
                        FieldParser<T> parser2 = getParser();
-                       
+
                        T val1 = parser1.createValue();
                        T val2 = parser2.createValue();
-                       
+
                        int pos1 = 0;
                        int pos2 = 0;
-                       
+
                        for (int i = 0; i < results.length; i++) {
-                               pos1 = 
parser1.parseField(allBytesWithDelimiter, pos1, allBytesWithDelimiter.length, 
'|', val1);
-                               pos2 = 
parser2.parseField(allBytesNoDelimiterEnd, pos2, allBytesNoDelimiterEnd.length, 
',', val2);
-                               
+                               pos1 = 
parser1.parseField(allBytesWithDelimiter, pos1, allBytesWithDelimiter.length, 
new byte[] {'|'}, val1);
+                               pos2 = 
parser2.parseField(allBytesNoDelimiterEnd, pos2, allBytesNoDelimiterEnd.length, 
new byte[] {','}, val2);
+
                                assertTrue("Parser declared the valid value " + 
testValues[i] + " as invalid.", pos1 != -1);
                                assertTrue("Parser declared the valid value " + 
testValues[i] + " as invalid.", pos2 != -1);
-                               
+
                                T result1 = parser1.getLastResult();
                                T result2 = parser2.getLastResult();
-                               
+
                                assertEquals("Parser parsed wrong.", 
results[i], result1);
                                assertEquals("Parser parsed wrong.", 
results[i], result2);
                        }
@@ -139,6 +195,43 @@ public abstract class ParserTestBase<T> {
                        fail("Test erroneous: " + e.getMessage());
                }
        }
+
+       @Test
+       public void testConcatenatedMultiCharDelimiter() {
+               try {
+                       String[] testValues = getValidTestValues();
+                       T[] results = getValidTestResults();
+                       byte[] allBytesWithDelimiter = concatenate(testValues, 
new char[] {'&','&', '&', '&'}, true);
+                       byte[] allBytesNoDelimiterEnd = concatenate(testValues, 
new char[] {'9', '9', '9'}, false);
+
+                       FieldParser<T> parser1 = getParser();
+                       FieldParser<T> parser2 = getParser();
+
+                       T val1 = parser1.createValue();
+                       T val2 = parser2.createValue();
+
+                       int pos1 = 0;
+                       int pos2 = 0;
+
+                       for (int i = 0; i < results.length; i++) {
+                               pos1 = 
parser1.parseField(allBytesWithDelimiter, pos1,  allBytesWithDelimiter.length,  
 new byte[] {'&','&','&','&'}, val1);
+                               assertTrue("Parser declared the valid value " + 
testValues[i] + " as invalid.", pos1 != -1);
+                               T result1 = parser1.getLastResult();
+                               assertEquals("Parser parsed wrong.", 
results[i], result1);
+
+                               pos2 = 
parser2.parseField(allBytesNoDelimiterEnd, pos2, allBytesNoDelimiterEnd.length, 
 new byte[] {'9','9','9'}, val2);
+                               assertTrue("Parser declared the valid value " + 
testValues[i] + " as invalid.", pos2 != -1);
+                               T result2 = parser2.getLastResult();
+                               assertEquals("Parser parsed wrong.", 
results[i], result2);
+
+                       }
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test erroneous: " + e.getMessage());
+               }
+       }
        
        @Test
        public void testInValidStringInIsolation() {
@@ -150,7 +243,7 @@ public abstract class ParserTestBase<T> {
                                FieldParser<T> parser = getParser();
                                
                                byte[] bytes = testValues[i].getBytes();
-                               int numRead = parser.parseField(bytes, 0, 
bytes.length, '|', parser.createValue());
+                               int numRead = parser.parseField(bytes, 0, 
bytes.length, new byte[] {'|'}, parser.createValue());
                                
                                assertTrue("Parser accepted the invalid value " 
+ testValues[i] + ".", numRead == -1);
                        }
@@ -183,12 +276,12 @@ public abstract class ParserTestBase<T> {
                                testLine[splitPoint] = invalid;
                                System.arraycopy(validValues, splitPoint, 
testLine, splitPoint + 1, validValues.length - splitPoint);
                                
-                               byte[] bytes = concatenate(testLine, '%', true);
+                               byte[] bytes = concatenate(testLine, new char[] 
{'%'}, true);
                                
                                // read the valid parts
                                int pos = 0;
                                for (int i = 0; i < splitPoint; i++) {
-                                       pos = parser.parseField(bytes, pos, 
bytes.length, '%', value);
+                                       pos = parser.parseField(bytes, pos, 
bytes.length, new byte[] {'%'}, value);
                                        
                                        assertTrue("Parser declared the valid 
value " + validValues[i] + " as invalid.", pos != -1);
                                        T result = parser.getLastResult();
@@ -196,7 +289,7 @@ public abstract class ParserTestBase<T> {
                                }
                                
                                // fail on the invalid part
-                               pos = parser.parseField(bytes, pos, 
bytes.length, '%', value);
+                               pos = parser.parseField(bytes, pos, 
bytes.length, new byte[] {'%'}, value);
                                assertTrue("Parser accepted the invalid value " 
+ invalid + ".", pos == -1);
                        }
                }
@@ -279,14 +372,14 @@ public abstract class ParserTestBase<T> {
                }
        }
        
-       private static byte[] concatenate(String[] values, char delimiter, 
boolean delimiterAtEnd) {
+       private static byte[] concatenate(String[] values, char[] delimiter, 
boolean delimiterAtEnd) {
                int len = 0;
                for (String s : values) {
-                       len += s.length() + 1;
+                       len += s.length() + delimiter.length;
                }
                
                if (!delimiterAtEnd) {
-                       len--;
+                       len-=delimiter.length;
                }
                
                int currPos = 0;
@@ -301,10 +394,12 @@ public abstract class ParserTestBase<T> {
                        currPos += numBytes;
                        
                        if (delimiterAtEnd || i < values.length-1) {
-                               result[currPos++] = (byte) delimiter;
+                               for(int j = 0; j < delimiter.length; j++)
+                               result[currPos++] = (byte) delimiter[j];
                        }
                }
                
                return result;
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
index d9feda0..3f4cd02 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
@@ -28,14 +28,14 @@ public class ShortParserTest extends ParserTestBase<Short> {
        @Override
        public String[] getValidTestValues() {
                return new String[] {
-                       "0", "1", "576", "-8778", 
String.valueOf(Short.MAX_VALUE), String.valueOf(Short.MIN_VALUE)
+                       "0", "1", "576", "-8778", 
String.valueOf(Short.MAX_VALUE), String.valueOf(Short.MIN_VALUE), "119"
                };
        }
        
        @Override
        public Short[] getValidTestResults() {
                return new Short[] {
-                       (short) 0, (short)1, (short)576, (short) -8778, 
Short.MAX_VALUE, Short.MIN_VALUE
+                       (short) 0, (short)1, (short)576, (short) -8778, 
Short.MAX_VALUE, Short.MIN_VALUE, (short) 119
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
index d7db017..44f1589 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
@@ -29,7 +29,7 @@ public class ShortValueParserTest extends 
ParserTestBase<ShortValue> {
        @Override
        public String[] getValidTestValues() {
                return new String[] {
-                       "0", "1", "576", "-8778", 
String.valueOf(Short.MAX_VALUE), String.valueOf(Short.MIN_VALUE)
+                       "0", "1", "576", "-8778", 
String.valueOf(Short.MAX_VALUE), String.valueOf(Short.MIN_VALUE), "1239"
                };
        }
        
@@ -37,7 +37,8 @@ public class ShortValueParserTest extends 
ParserTestBase<ShortValue> {
        public ShortValue[] getValidTestResults() {
                return new ShortValue[] {
                        new ShortValue((short) 0), new ShortValue((short) 1), 
new ShortValue((short) 576),
-                       new ShortValue((short) -8778), new 
ShortValue(Short.MAX_VALUE), new ShortValue(Short.MIN_VALUE)
+                       new ShortValue((short) -8778), new 
ShortValue(Short.MAX_VALUE), new ShortValue(Short.MIN_VALUE),
+                       new ShortValue((short)1239)
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java
index 0d68730..702f985 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java
@@ -29,19 +29,19 @@ public class StringParserTest extends 
ParserTestBase<String> {
        public String[] getValidTestValues() {
                return new String[] {
                        "abcdefgh", "i", "jklmno", "\"abcdefgh\"", "\"i\"", 
"\"jklmno\"", 
-                       "\"ab,cde|fg\"", "\"hij|m|n|op\"",
+                       "\"ab,cde|fg\"", "\"hij|m|n|op\"", "\"hij&&m&&n&&op\"",
                        "  \"abcdefgh\"", "     \"i\"\t\t\t", "\t \t\"jklmno\"  
",
-                       "  \"     abcd    \" \t "
+                       "  \"     abcd    \" \t ", "Hello9"
                };
        }
        
        @Override
        public String[] getValidTestResults() {
                return new String[] {
-                       "abcdefgh", "i", "jklmno", "abcdefgh", "i", "jklmno", 
-                       "ab,cde|fg", "hij|m|n|op",
+                       "abcdefgh", "i", "jklmno", "abcdefgh", "i", "jklmno",
+                       "ab,cde|fg", "hij|m|n|op", "hij&&m&&n&&op",
                        "abcdefgh", "i", "jklmno",
-                       "     abcd    "
+                       "     abcd    ", "Hello9"
                };
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java
index 35332d1..66ad32d 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java
@@ -30,9 +30,9 @@ public class StringValueParserTest extends 
ParserTestBase<StringValue> {
        public String[] getValidTestValues() {
                return new String[] {
                        "abcdefgh", "i", "jklmno", "\"abcdefgh\"", "\"i\"", 
"\"jklmno\"", 
-                       "\"ab,cde|fg\"", "\"hij|m|n|op\"",
+                       "\"ab,cde|fg\"", "\"hij|m|n|op\"", "\"hij&&m&&n&&op\"",
                        "  \"abcdefgh\"", "     \"i\"\t\t\t", "\t \t\"jklmno\"  
",
-                       "  \"     abcd    \" \t "
+                       "  \"     abcd    \" \t ", "Hello9"
                };
        }
        
@@ -41,9 +41,9 @@ public class StringValueParserTest extends 
ParserTestBase<StringValue> {
                return new StringValue[] {
                        new StringValue("abcdefgh"), new StringValue("i"), new 
StringValue("jklmno"),
                        new StringValue("abcdefgh"), new StringValue("i"), new 
StringValue("jklmno"), 
-                       new StringValue("ab,cde|fg"), new 
StringValue("hij|m|n|op"),
+                       new StringValue("ab,cde|fg"), new 
StringValue("hij|m|n|op"), new StringValue("hij&&m&&n&&op"),
                        new StringValue("abcdefgh"), new StringValue("i"), new 
StringValue("jklmno"),
-                       new StringValue("     abcd    ")
+                       new StringValue("     abcd    "), new 
StringValue("Hello9")
                };
        }
 

Reply via email to