[FLINK-1168] Add support for multi-char field delimiters in CSVInputFormats. This commit includes parts of Cbro's pull request and subsumes PR #247
This closes #247 This closes #264 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0548a93d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0548a93d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0548a93d Branch: refs/heads/master Commit: 0548a93dfc555a5403590f147d4850c730facaf6 Parents: 06b2acf Author: Fabian Hueske <[email protected]> Authored: Mon Oct 20 15:18:20 2014 +0200 Committer: Fabian Hueske <[email protected]> Committed: Mon Jan 26 14:50:44 2015 +0100 ---------------------------------------------------------------------- .../flink/api/avro/AvroOutputFormatTest.java | 2 +- .../program/ExecutionPlanCreationTest.java | 2 +- .../api/common/io/DelimitedInputFormat.java | 62 +------- .../api/common/io/GenericCsvInputFormat.java | 52 +++--- .../apache/flink/types/parser/ByteParser.java | 12 +- .../flink/types/parser/ByteValueParser.java | 13 +- .../apache/flink/types/parser/DoubleParser.java | 12 +- .../flink/types/parser/DoubleValueParser.java | 14 +- .../apache/flink/types/parser/FieldParser.java | 33 +++- .../apache/flink/types/parser/FloatParser.java | 14 +- .../flink/types/parser/FloatValueParser.java | 14 +- .../apache/flink/types/parser/IntParser.java | 14 +- .../flink/types/parser/IntValueParser.java | 14 +- .../apache/flink/types/parser/LongParser.java | 14 +- .../flink/types/parser/LongValueParser.java | 22 +-- .../apache/flink/types/parser/ShortParser.java | 10 +- .../flink/types/parser/ShortValueParser.java | 10 +- .../apache/flink/types/parser/StringParser.java | 44 +++--- .../flink/types/parser/StringValueParser.java | 46 +++--- .../common/io/GenericCsvInputFormatTest.java | 89 +++++++++-- .../flink/types/parser/ByteParserTest.java | 4 +- .../flink/types/parser/ByteValueParserTest.java | 4 +- .../flink/types/parser/DoubleParserTest.java | 4 +- .../types/parser/DoubleValueParserTest.java | 4 +- .../flink/types/parser/FloatParserTest.java | 4 +- .../types/parser/FloatValueParserTest.java | 4 +- .../flink/types/parser/IntParserTest.java | 4 +- .../flink/types/parser/IntValueParserTest.java | 4 +- .../flink/types/parser/LongParserTest.java | 5 +- .../flink/types/parser/LongValueParserTest.java | 4 +- .../flink/types/parser/ParserTestBase.java | 157 +++++++++++++++---- .../flink/types/parser/ShortParserTest.java | 4 +- .../types/parser/ShortValueParserTest.java | 5 +- .../flink/types/parser/StringParserTest.java | 10 +- .../types/parser/StringValueParserTest.java | 8 +- .../types/parser/VarLengthStringParserTest.java | 50 +++--- .../flink/examples/java/clustering/KMeans.java | 4 +- .../java/graph/ConnectedComponents.java | 2 +- .../examples/java/graph/EnumTrianglesBasic.java | 2 +- .../examples/java/graph/EnumTrianglesOpt.java | 2 +- .../examples/java/graph/PageRankBasic.java | 4 +- .../java/graph/TransitiveClosureNaive.java | 2 +- .../examples/java/ml/LinearRegression.java | 2 +- .../relational/EmptyFieldsCountAccumulator.java | 2 +- .../examples/java/relational/TPCHQuery10.java | 8 +- .../examples/java/relational/TPCHQuery3.java | 6 +- .../java/relational/WebLogAnalysis.java | 6 +- .../examples/scala/clustering/KMeans.scala | 4 +- .../scala/graph/ConnectedComponents.scala | 2 +- .../scala/graph/EnumTrianglesBasic.scala | 2 +- .../examples/scala/graph/EnumTrianglesOpt.scala | 2 +- .../examples/scala/graph/PageRankBasic.scala | 4 +- .../scala/graph/TransitiveClosureNaive.scala | 2 +- .../examples/scala/ml/LinearRegression.scala | 2 +- .../examples/scala/relational/TPCHQuery10.scala | 8 +- .../examples/scala/relational/TPCHQuery3.scala | 6 +- .../scala/relational/WebLogAnalysis.scala | 6 +- .../flink/api/java/io/CsvInputFormat.java | 4 +- .../org/apache/flink/api/java/io/CsvReader.java | 15 +- .../flink/api/java/io/PrimitiveInputFormat.java | 2 +- .../api/java/record/io/CsvInputFormat.java | 7 +- .../flink/api/java/io/CsvInputFormatTest.java | 32 ++-- .../api/java/record/io/CsvInputFormatTest.java | 2 +- .../examples/java8/relational/TPCHQuery10.java | 8 +- .../flink/api/scala/ExecutionEnvironment.scala | 4 +- .../ConnectedComponentsWithObjectMapITCase.java | 2 +- .../flink/test/util/testjar/KMeansForTest.java | 4 +- .../flink/api/scala/io/CsvInputFormatTest.scala | 17 +- 68 files changed, 564 insertions(+), 379 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java index 42c1702..a8bace3 100644 --- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java +++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java @@ -63,7 +63,7 @@ public class AvroOutputFormatTest extends JavaProgramTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath) - .fieldDelimiter('|') + .fieldDelimiter("|") .types(String.class, Integer.class, String.class); //output the data with AvroOutputFormat for specific user type http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java index 83bdc8d..c6cada8 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java @@ -76,7 +76,7 @@ public class ExecutionPlanCreationTest { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple2<Long, Long>> input = env.readCsvFile(args[0]) - .fieldDelimiter('\t').types(Long.class, Long.class); + .fieldDelimiter("\t").types(Long.class, Long.class); DataSet<Tuple2<Long, Long>> result = input.map( new MapFunction<Tuple2<Long,Long>, Tuple2<Long,Long>>() { http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index 52edfa7..d2b4e83 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -20,9 +20,6 @@ package org.apache.flink.api.common.io; import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.charset.IllegalCharsetNameException; -import java.nio.charset.UnsupportedCharsetException; import java.util.ArrayList; import org.slf4j.Logger; @@ -188,27 +185,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> { } public void setDelimiter(String delimiter) { - setDelimiter(delimiter, Charsets.UTF_8); - } - - public void setDelimiter(String delimiter, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { - if (charsetName == null) { - throw new IllegalArgumentException("Charset name must not be null"); - } - - Charset charset = Charset.forName(charsetName); - setDelimiter(delimiter, charset); - } - - public void setDelimiter(String delimiter, Charset charset) { - if (delimiter == null) { - throw new IllegalArgumentException("Delimiter must not be null"); - } - if (charset == null) { - throw new IllegalArgumentException("Charset must not be null"); - } - - this.delimiter = delimiter.getBytes(charset); + this.delimiter = delimiter.getBytes(Charsets.UTF_8); } public int getLineLengthLimit() { @@ -281,19 +258,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> { String delimString = parameters.getString(RECORD_DELIMITER, null); if (delimString != null) { - String charsetName = parameters.getString(RECORD_DELIMITER_ENCODING, null); - - if (charsetName == null) { - setDelimiter(delimString); - } else { - try { - setDelimiter(delimString, charsetName); - } - catch (UnsupportedCharsetException e) { - throw new IllegalArgumentException("The charset with the name '" + charsetName + - "' is not supported on this TaskManager instance.", e); - } - } + setDelimiter(delimString); } // set the number of samples @@ -639,11 +604,6 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> { protected static final String RECORD_DELIMITER = "delimited-format.delimiter"; /** - * The configuration key to set the record delimiter encoding. - */ - private static final String RECORD_DELIMITER_ENCODING = "delimited-format.delimiter-encoding"; - - /** * The configuration key to set the number of samples to take for the statistics. */ private static final String NUM_STATISTICS_SAMPLES = "delimited-format.numSamples"; @@ -713,24 +673,6 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> { } /** - * Sets the delimiter to be the given string. The string will be converted to bytes for more efficient - * comparison during input parsing. The conversion will be done using the charset with the given name. - * The charset must be available on the processing nodes, otherwise an exception will be raised at - * runtime. - * - * @param delimiter The delimiter string. - * @param charsetName The name of the encoding character set. - * @return The builder itself. - */ - public T recordDelimiter(String delimiter, String charsetName) { - this.config.setString(RECORD_DELIMITER, delimiter); - this.config.setString(RECORD_DELIMITER_ENCODING, charsetName); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** * Sets the number of line samples to take in order to estimate the base statistics for the * input format. * http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index a6ee1da..151b1e2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.io; +import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; import org.apache.flink.core.fs.FileInputSplit; @@ -38,7 +39,9 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> private static final boolean[] EMPTY_INCLUDED = new boolean[0]; - private static final char DEFAULT_FIELD_DELIMITER = ','; + private static final byte[] DEFAULT_FIELD_DELIMITER = new byte[] {','}; + + private static final char QUOTE_CHARACTER = '"'; // -------------------------------------------------------------------------------------------- @@ -57,7 +60,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> private boolean[] fieldIncluded = EMPTY_INCLUDED; - private char fieldDelim = DEFAULT_FIELD_DELIMITER; + private byte[] fieldDelim = DEFAULT_FIELD_DELIMITER; private boolean lenient; @@ -86,16 +89,24 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> return this.fieldTypes.length; } - public char getFieldDelimiter() { + public byte[] getFieldDelimiter() { return fieldDelim; } - public void setFieldDelimiter(char fieldDelim) { - if (fieldDelim > Byte.MAX_VALUE) { - throw new IllegalArgumentException("The field delimiter must be an ASCII character."); + public void setFieldDelimiter(byte[] delimiter) { + if (delimiter == null) { + throw new IllegalArgumentException("Delimiter must not be null"); } - - this.fieldDelim = fieldDelim; + + this.fieldDelim = delimiter; + } + + public void setFieldDelimiter(char delimiter) { + setFieldDelimiter(String.valueOf(delimiter)); + } + + public void setFieldDelimiter(String delimiter) { + this.fieldDelim = delimiter.getBytes(Charsets.UTF_8); } public boolean isLenient() { @@ -308,7 +319,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> } else { // skip field - startPos = skipFields(bytes, startPos, limit, fieldDelim); + startPos = skipFields(bytes, startPos, limit, this.fieldDelim); if (startPos < 0) { if (!lenient) { String lineAsString = new String(bytes, offset, numBytes); @@ -325,7 +336,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> private String fieldTypesToString() { StringBuilder string = new StringBuilder(); string.append(this.fieldTypes[0].toString()); - + for (int i = 1; i < this.fieldTypes.length; i++) { string.append(", ").append(this.fieldTypes[i]); } @@ -333,11 +344,12 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> return string.toString(); } - protected int skipFields(byte[] bytes, int startPos, int limit, char delim) { + protected int skipFields(byte[] bytes, int startPos, int limit, byte[] delim) { + int i = startPos; - final byte delByte = (byte) delim; byte current; + final int delimLimit = limit - delim.length + 1; // skip over initial whitespace lines while (i < limit && ((current = bytes[i]) == ' ' || current == '\t')) { @@ -345,11 +357,11 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> } // first none whitespace character - if (i < limit && bytes[i] == '"') { + if (i < limit && bytes[i] == QUOTE_CHARACTER) { // quoted string i++; // the quote - while (i < limit && bytes[i] != '"') { + while (i < limit && bytes[i] != QUOTE_CHARACTER) { i++; } @@ -358,16 +370,16 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> i++; // the quote // skip trailing whitespace characters - while (i < limit && (current = bytes[i]) != delByte) { + while (i < delimLimit && !FieldParser.delimiterNext(bytes, i, delim)) { + current = bytes[i]; if (current == ' ' || current == '\t') { i++; - } - else { + } else { return -1; // illegal case of non-whitespace characters trailing } } - return (i == limit ? limit : i+1); + return (i >= delimLimit ? limit : i + delim.length); } else { // exited due to line end without quote termination return -1; @@ -375,10 +387,10 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> } else { // unquoted field - while (i < limit && bytes[i] != delByte) { + while (i < delimLimit && !FieldParser.delimiterNext(bytes, i, delim)) { i++; } - return (i == limit ? limit : i+1); + return (i >= delimLimit ? limit : i + delim.length); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java index 4be4d7f..5858da2 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java @@ -25,25 +25,27 @@ public class ByteParser extends FieldParser<Byte> { private byte result; @Override - public int parseField(byte[] bytes, int startPos, int limit, char delimiter, Byte reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Byte reusable) { int val = 0; boolean neg = false; + + final int delimLimit = limit-delimiter.length+1; if (bytes[startPos] == '-') { neg = true; startPos++; // check for empty field with only the sign - if (startPos == limit || bytes[startPos] == delimiter) { + if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN); return -1; } } - + for (int i = startPos; i < limit; i++) { - if (bytes[i] == delimiter) { + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { this.result = (byte) (neg ? -val : val); - return i+1; + return i + delimiter.length; } if (bytes[i] < 48 || bytes[i] > 57) { setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java index 85fefb6..f9b36e4 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java @@ -30,27 +30,30 @@ public class ByteValueParser extends FieldParser<ByteValue> { private ByteValue result; @Override - public int parseField(byte[] bytes, int startPos, int limit, char delimiter, ByteValue reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, ByteValue reusable) { int val = 0; boolean neg = false; this.result = reusable; + + final int delimLimit = limit-delimiter.length+1; if (bytes[startPos] == '-') { neg = true; startPos++; // check for empty field with only the sign - if (startPos == limit || bytes[startPos] == delimiter) { + if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN); return -1; } } - + for (int i = startPos; i < limit; i++) { - if (bytes[i] == delimiter) { + + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { reusable.setValue((byte) (neg ? -val : val)); - return i+1; + return i + delimiter.length; } if (bytes[i] < 48 || bytes[i] > 57) { setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java index df768e3..947fdfe 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java @@ -29,18 +29,22 @@ public class DoubleParser extends FieldParser<Double> { private double result; @Override - public int parseField(byte[] bytes, int startPos, int limit, char delimiter, Double reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Double reusable) { int i = startPos; - final byte delByte = (byte) delimiter; + + final int delimLimit = limit-delimiter.length+1; - while (i < limit && bytes[i] != delByte) { + while (i < limit) { + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + break; + } i++; } String str = new String(bytes, startPos, i-startPos); try { this.result = Double.parseDouble(str); - return (i == limit) ? limit : i+1; + return (i == limit) ? limit : i + delimiter.length; } catch (NumberFormatException e) { setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java index 2f12a0e..e225c1f 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java @@ -29,12 +29,16 @@ public class DoubleValueParser extends FieldParser<DoubleValue> { private DoubleValue result; @Override - public int parseField(byte[] bytes, int startPos, int limit, char delim, DoubleValue reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, DoubleValue reusable) { int i = startPos; - final byte delByte = (byte) delim; - - while (i < limit && bytes[i] != delByte) { + + final int delimLimit = limit-delimiter.length+1; + + while (i < limit) { + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + break; + } i++; } @@ -43,7 +47,7 @@ public class DoubleValueParser extends FieldParser<DoubleValue> { double value = Double.parseDouble(str); reusable.setValue(value); this.result = reusable; - return (i == limit) ? limit : i+1; + return (i == limit) ? limit : i + delimiter.length; } catch (NumberFormatException e) { setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index 44a6014..322f48e 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -78,16 +78,16 @@ public abstract class FieldParser<T> { * @param startPos The index where the field starts * @param limit The limit unto which the byte contents is valid for the parser. The limit is the * position one after the last valid byte. - * @param delim Field delimiter character - * @param reuse The an optional reusable field to hold the value + * @param delim The field delimiter character + * @param reuse An optional reusable field to hold the value * * @return The index of the next delimiter, if the field was parsed correctly. A value less than 0 otherwise. */ - public abstract int parseField(byte[] bytes, int startPos, int limit, char delim, T reuse); + public abstract int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse); /** * Gets the parsed field. This method returns the value parsed by the last successful invocation of - * {@link #parseField(byte[], int, int, char, Object)}. It objects are mutable and reused, it will return + * {@link #parseField(byte[], int, int, byte[], Object)}. It objects are mutable and reused, it will return * the object instance that was passed the the parse function. * * @return The latest parsed field. @@ -102,6 +102,29 @@ public abstract class FieldParser<T> { public abstract T createValue(); /** + * Checks if the delimiter starts at the given start position of the byte array. + * + * Attention: This method assumes that enough characters follow the start position for the delimiter check! + * + * @param bytes The byte array that holds the value. + * @param startPos The index of the byte array where the check for the delimiter starts. + * @param delim The delimiter to check for. + * + * @return true if a delimiter starts at the given start position, false otherwise. + */ + public static final boolean delimiterNext(byte[] bytes, int startPos, byte[] delim) { + + for(int pos = 0; pos < delim.length; pos++) { + // check each position + if(delim[pos] != bytes[startPos+pos]) { + return false; + } + } + return true; + + } + + /** * Sets the error state of the parser. Called by subclasses of the parser to set the type of error * when failing a parse. * @@ -155,7 +178,7 @@ public abstract class FieldParser<T> { PARSERS.put(String.class, StringParser.class); PARSERS.put(Float.class, FloatParser.class); PARSERS.put(Double.class, DoubleParser.class); - + // value types PARSERS.put(ByteValue.class, ByteValueParser.class); PARSERS.put(ShortValue.class, ShortValueParser.class); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java index 6d85d82..7d166c7 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java @@ -27,19 +27,23 @@ public class FloatParser extends FieldParser<Float> { private float result; @Override - public int parseField(byte[] bytes, int startPos, int limit, char delim, Float reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Float reusable) { int i = startPos; - final byte delByte = (byte) delim; - - while (i < limit && bytes[i] != delByte) { + + final int delimLimit = limit-delimiter.length+1; + + while (i < limit) { + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + break; + } i++; } String str = new String(bytes, startPos, i-startPos); try { this.result = Float.parseFloat(str); - return (i == limit) ? limit : i+1; + return (i == limit) ? limit : i+ delimiter.length; } catch (NumberFormatException e) { setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java index a682301..af16d4c 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java @@ -29,12 +29,16 @@ public class FloatValueParser extends FieldParser<FloatValue> { private FloatValue result; @Override - public int parseField(byte[] bytes, int startPos, int limit, char delim, FloatValue reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, FloatValue reusable) { int i = startPos; - final byte delByte = (byte) delim; - - while (i < limit && bytes[i] != delByte) { + + final int delimLimit = limit-delimiter.length+1; + + while (i < limit) { + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + break; + } i++; } @@ -43,7 +47,7 @@ public class FloatValueParser extends FieldParser<FloatValue> { float value = Float.parseFloat(str); reusable.setValue(value); this.result = reusable; - return (i == limit) ? limit : i+1; + return (i == limit) ? limit : i + delimiter.length; } catch (NumberFormatException e) { setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java index a5d8c06..c871f4a 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java @@ -32,25 +32,27 @@ public class IntParser extends FieldParser<Integer> { private int result; @Override - public int parseField(byte[] bytes, int startPos, int limit, char delimiter, Integer reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Integer reusable) { long val = 0; boolean neg = false; - + + final int delimLimit = limit-delimiter.length+1; + if (bytes[startPos] == '-') { neg = true; startPos++; // check for empty field with only the sign - if (startPos == limit || bytes[startPos] == delimiter) { + if (startPos == limit || ( startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN); return -1; } } for (int i = startPos; i < limit; i++) { - if (bytes[i] == delimiter) { + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { this.result = (int) (neg ? -val : val); - return i+1; + return i + delimiter.length; } if (bytes[i] < 48 || bytes[i] > 57) { setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); @@ -114,7 +116,7 @@ public class IntParser extends FieldParser<Integer> { } long val = 0; boolean neg = false; - + if (bytes[startPos] == '-') { neg = true; startPos++; http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java index 42f618c..8cb8176 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java @@ -33,27 +33,29 @@ public class IntValueParser extends FieldParser<IntValue> { private IntValue result; @Override - public int parseField(byte[] bytes, int startPos, int limit, char delimiter, IntValue reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, IntValue reusable) { long val = 0; boolean neg = false; - + + final int delimLimit = limit-delimiter.length+1; + this.result = reusable; - + if (bytes[startPos] == '-') { neg = true; startPos++; // check for empty field with only the sign - if (startPos == limit || bytes[startPos] == delimiter) { + if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN); return -1; } } for (int i = startPos; i < limit; i++) { - if (bytes[i] == delimiter) { + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { reusable.setValue((int) (neg ? -val : val)); - return i+1; + return i + delimiter.length; } if (bytes[i] < 48 || bytes[i] > 57) { setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java index 830fb3b..af17f15 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java @@ -28,25 +28,27 @@ public class LongParser extends FieldParser<Long> { private long result; @Override - public int parseField(byte[] bytes, int startPos, int limit, char delimiter, Long reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Long reusable) { long val = 0; boolean neg = false; + + final int delimLimit = limit - delimiter.length + 1; if (bytes[startPos] == '-') { neg = true; startPos++; // check for empty field with only the sign - if (startPos == limit || bytes[startPos] == delimiter) { + if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN); return -1; } } for (int i = startPos; i < limit; i++) { - if (bytes[i] == delimiter) { + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { this.result = neg ? -val : val; - return i+1; + return i + delimiter.length; } if (bytes[i] < 48 || bytes[i] > 57) { setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); @@ -63,8 +65,8 @@ public class LongParser extends FieldParser<Long> { if (i+1 >= limit) { return limit; - } else if (bytes[i+1] == delimiter) { - return i+2; + } else if (i+1 < delimLimit && delimiterNext(bytes, i+1, delimiter)) { + return i + 1 + delimiter.length; } else { setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW); return -1; http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java index 321cbcb..8b697cc 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java @@ -30,10 +30,12 @@ public class LongValueParser extends FieldParser<LongValue> { private LongValue result; @Override - public int parseField(byte[] bytes, int startPos, int limit, char delimiter, LongValue reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, LongValue reusable) { long val = 0; boolean neg = false; - + + final int delimLimit = limit-delimiter.length+1; + this.result = reusable; if (bytes[startPos] == '-') { @@ -41,16 +43,16 @@ public class LongValueParser extends FieldParser<LongValue> { startPos++; // check for empty field with only the sign - if (startPos == limit || bytes[startPos] == delimiter) { + if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN); return -1; } } for (int i = startPos; i < limit; i++) { - if (bytes[i] == delimiter) { + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { reusable.setValue(neg ? -val : val); - return i+1; + return i + delimiter.length; } if (bytes[i] < 48 || bytes[i] > 57) { setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); @@ -58,7 +60,7 @@ public class LongValueParser extends FieldParser<LongValue> { } val *= 10; val += bytes[i] - 48; - + // check for overflow / underflow if (val < 0) { // this is an overflow/underflow, unless we hit exactly the Long.MIN_VALUE @@ -66,9 +68,9 @@ public class LongValueParser extends FieldParser<LongValue> { reusable.setValue(Long.MIN_VALUE); if (i+1 >= limit) { - return limit; - } else if (bytes[i+1] == delimiter) { - return i+2; + return limit; + } else if (i+1 < delimLimit && delimiterNext(bytes, i+1, delimiter)) { + return i + 1 + delimiter.length; } else { setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW); return -1; @@ -80,7 +82,7 @@ public class LongValueParser extends FieldParser<LongValue> { } } } - + reusable.setValue(neg ? -val : val); return limit; } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java index 0d431df..a6f9898 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java @@ -32,25 +32,27 @@ public class ShortParser extends FieldParser<Short> { private short result; @Override - public int parseField(byte[] bytes, int startPos, int limit, char delimiter, Short reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Short reusable) { int val = 0; boolean neg = false; + + final int delimLimit = limit-delimiter.length+1; if (bytes[startPos] == '-') { neg = true; startPos++; // check for empty field with only the sign - if (startPos == limit || bytes[startPos] == delimiter) { + if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN); return -1; } } for (int i = startPos; i < limit; i++) { - if (bytes[i] == delimiter) { + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { this.result = (short) (neg ? -val : val); - return i+1; + return i + delimiter.length; } if (bytes[i] < 48 || bytes[i] > 57) { setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java index 82272a6..f5168cc 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java @@ -33,9 +33,11 @@ public class ShortValueParser extends FieldParser<ShortValue> { private ShortValue result; @Override - public int parseField(byte[] bytes, int startPos, int limit, char delimiter, ShortValue reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, ShortValue reusable) { int val = 0; boolean neg = false; + + final int delimLimit = limit-delimiter.length+1; this.result = reusable; @@ -44,16 +46,16 @@ public class ShortValueParser extends FieldParser<ShortValue> { startPos++; // check for empty field with only the sign - if (startPos == limit || bytes[startPos] == delimiter) { + if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN); return -1; } } for (int i = startPos; i < limit; i++) { - if (bytes[i] == delimiter) { + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { reusable.setValue((short) (neg ? -val : val)); - return i+1; + return i + delimiter.length; } if (bytes[i] < 48 || bytes[i] > 57) { setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java index a39dbe0..bd2550e 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java @@ -28,8 +28,8 @@ public class StringParser extends FieldParser<String> { private static final byte WHITESPACE_SPACE = (byte) ' '; private static final byte WHITESPACE_TAB = (byte) '\t'; - - private static final byte QUOTE_DOUBLE = (byte) '"'; + + private static final byte QUOTE_CHARACTER = (byte) '"'; private static enum ParserStates { NONE, IN_QUOTE, STOP @@ -38,12 +38,13 @@ public class StringParser extends FieldParser<String> { private String result; @Override - public int parseField(byte[] bytes, int startPos, int limit, char delim, String reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, String reusable) { int i = startPos; - - final byte delByte = (byte) delim; byte current; + boolean delimiterFound = false; + + final int delimLimit = limit-delimiter.length+1; // count initial whitespace lines while (i < limit && ((current = bytes[i]) == WHITESPACE_SPACE || current == WHITESPACE_TAB)) { @@ -62,11 +63,14 @@ public class StringParser extends FieldParser<String> { if(endOfCellPosition == limit) { break; } - current = bytes[endOfCellPosition]; - if(current == delByte) { + if(endOfCellPosition < delimLimit && delimiterNext(bytes, endOfCellPosition, delimiter)) { // if we are in a quote do nothing, otherwise we reached the end - parserState = parserState == ParserStates.IN_QUOTE ? parserState: ParserStates.STOP; - } else if(current == QUOTE_DOUBLE) { + if (parserState != ParserStates.IN_QUOTE) { + parserState = ParserStates.STOP; + delimiterFound = true; + } + endOfCellPosition += delimiter.length - 1; + } else if(bytes[endOfCellPosition] == QUOTE_CHARACTER) { // we entered a quote if(parserState == ParserStates.IN_QUOTE) { // we end the quote @@ -77,6 +81,7 @@ public class StringParser extends FieldParser<String> { } } } + int delimCorrection = delimiterFound ? delimiter.length : 1; if(parserState == ParserStates.IN_QUOTE) { // exited due to line end without quote termination @@ -84,54 +89,51 @@ public class StringParser extends FieldParser<String> { return -1; } - // boundary of the cell is now // i --> endOfCellPosition // first none whitespace character - if (i < limit && bytes[i] == QUOTE_DOUBLE) { + if (i < limit && bytes[i] == QUOTE_CHARACTER) { // check if there are characters at the end - current = bytes[endOfCellPosition - 1]; + current = bytes[endOfCellPosition - delimCorrection]; // if the character preceding the end of the cell is not a WHITESPACE or the end QUOTE_DOUBLE // there are unquoted characters at the end - if (!(current == WHITESPACE_SPACE || current == WHITESPACE_TAB || current == QUOTE_DOUBLE)) { + if (!(current == WHITESPACE_SPACE || current == WHITESPACE_TAB || current == QUOTE_CHARACTER)) { setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING); return -1; // illegal case of non-whitespace characters trailing } // skip trailing whitespace after quote .. by moving the cursor backwards int skipAtEnd = 0; - while (bytes[endOfCellPosition - 1 - skipAtEnd] == WHITESPACE_SPACE || bytes[endOfCellPosition - 1 - skipAtEnd] == WHITESPACE_TAB) { + while (bytes[endOfCellPosition - delimCorrection - skipAtEnd] == WHITESPACE_SPACE || + bytes[endOfCellPosition - delimCorrection - skipAtEnd] == WHITESPACE_TAB) { skipAtEnd++; } // now unescape boolean notEscaped = true; int endOfContent = i + 1; - for(int counter = endOfContent; counter < endOfCellPosition - skipAtEnd; counter++) { - notEscaped = bytes[counter] != QUOTE_DOUBLE || !notEscaped; + for(int counter = endOfContent; counter < endOfCellPosition - delimCorrection - skipAtEnd; counter++) { + notEscaped = bytes[counter] != QUOTE_CHARACTER || !notEscaped; if (notEscaped) { // realign bytes[endOfContent++] = bytes[counter]; } } - this.result = new String(bytes, i + 1, endOfContent - i - 1); - return (endOfCellPosition == limit ? limit : endOfCellPosition + 1); } else { // unquoted string - // set from the beginning. unquoted strings include the leading whitespaces - this.result = new String(bytes, i, endOfCellPosition - i); + this.result = new String(bytes, i, endOfCellPosition - i - (delimCorrection - 1)); return (endOfCellPosition == limit ? limit : endOfCellPosition + 1); } } - + @Override public String createValue() { return ""; http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java index 8d0febf..7fd7b54 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java @@ -36,41 +36,45 @@ public class StringValueParser extends FieldParser<StringValue> { private static final byte QUOTE_DOUBLE = (byte) '"'; private StringValue result; - + @Override - public int parseField(byte[] bytes, int startPos, int length, char delim, StringValue reusable) { - + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, StringValue reusable) { + this.result = reusable; - int i = startPos; - - final byte delByte = (byte) delim; byte current; - + + final int delimLimit = limit-delimiter.length+1; + // count initial whitespace lines - while (i < length && ((current = bytes[i]) == WHITESPACE_SPACE || current == WHITESPACE_TAB)) { + while (i < limit && ((current = bytes[i]) == WHITESPACE_SPACE || current == WHITESPACE_TAB)) { i++; } // first none whitespace character - if (i < length && bytes[i] == QUOTE_DOUBLE) { + if (i < limit && bytes[i] == QUOTE_DOUBLE) { // quoted string i++; // the quote // we count only from after the quote int quoteStart = i; - while (i < length && bytes[i] != QUOTE_DOUBLE) { + while (i < limit && bytes[i] != QUOTE_DOUBLE) { i++; } - if (i < length) { + if (i < limit) { // end of the string reusable.setValueAscii(bytes, quoteStart, i-quoteStart); i++; // the quote - // skip trailing whitespace characters - while (i < length && (current = bytes[i]) != delByte) { + // skip trailing whitespace characters + while (i < limit) { + + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + return i+delimiter.length; + } + current = bytes[i]; if (current == WHITESPACE_SPACE || current == WHITESPACE_TAB) { i++; } @@ -79,8 +83,10 @@ public class StringValueParser extends FieldParser<StringValue> { return -1; // illegal case of non-whitespace characters trailing } } - - return (i == length ? length : i+1); + if( i > limit ){ + i--; + } + return (i == limit ? limit : i + delimiter.length); } else { // exited due to line end without quote termination setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING); @@ -88,14 +94,16 @@ public class StringValueParser extends FieldParser<StringValue> { } } else { - // unquoted string - while (i < length && bytes[i] != delByte) { + // unquoted string -delim.length + while (i < limit) { + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + break; + } i++; } - // set from the beginning. unquoted strings include the leading whitespaces reusable.setValueAscii(bytes, startPos, i-startPos); - return (i == length ? length : i+1); + return (i == limit ? limit : i + delimiter.length); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java index 4d93070..3749645 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java @@ -88,7 +88,7 @@ public class GenericCsvInputFormatTest { final Configuration parameters = new Configuration(); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.setFieldTypesGeneric(IntValue.class, IntValue.class, IntValue.class, IntValue.class, IntValue.class); format.configure(parameters); @@ -128,7 +128,7 @@ public class GenericCsvInputFormatTest { final Configuration parameters = new Configuration(); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.setFieldTypesGeneric(IntValue.class, IntValue.class); format.configure(parameters); @@ -160,12 +160,14 @@ public class GenericCsvInputFormatTest { @Test public void testSparseParse() { try { - final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|"; + final String fileContent = + "111|222|333|444|555|666|777|888|999|000|\n"+ + "000|999|888|777|666|555|444|333|222|111|"; final FileInputSplit split = createTempFile(fileContent); final Configuration parameters = new Configuration(); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.setFieldTypesGeneric(IntValue.class, null, null, IntValue.class, null, null, null, IntValue.class); format.configure(parameters); @@ -203,7 +205,7 @@ public class GenericCsvInputFormatTest { final Configuration parameters = new Configuration(); - format.setFieldDelimiter(','); + format.setFieldDelimiter(","); format.setFieldTypesGeneric(LongValue.class, LongValue.class, LongValue.class); format.configure(parameters); format.open(split); @@ -241,7 +243,7 @@ public class GenericCsvInputFormatTest { final Configuration parameters = new Configuration(); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.setFieldsGeneric(new int[] { 0, 3, 7 }, (Class<? extends Value>[]) new Class[] { IntValue.class, IntValue.class, IntValue.class }); format.configure(parameters); @@ -269,7 +271,62 @@ public class GenericCsvInputFormatTest { fail("Test erroneous"); } } - + + @SuppressWarnings("unchecked") + @Test + public void testSparseParseWithIndicesMultiCharDelimiter() { + try { + final String fileContent = + "111|-|222|-|333|-|444|-|555|-|666|-|777|-|888|-|999|-|000|-|\n"+ + "000|-|999|-|888|-|777|-|666|-|555|-|444|-|333|-|222|-|111\n"+ + "555|-|999|-|888|-|111|-|666|-|555|-|444|-|777|-|222|-|111|-|\n"+ + "22222|-|99999|-|8|-|99999999|-|6666666|-|5|-|4444|-|8|-|22222|-|1\n"; + + final FileInputSplit split = createTempFile(fileContent); + + final Configuration parameters = new Configuration(); + + format.setFieldDelimiter("|-|"); + format.setFieldsGeneric(new int[] { 0, 3, 7 }, + (Class<? extends Value>[]) new Class[] { IntValue.class, IntValue.class, IntValue.class }); + format.configure(parameters); + format.open(split); + + Value[] values = createIntValues(3); + + values = format.nextRecord(values); + assertNotNull(values); + assertEquals(111, ((IntValue) values[0]).getValue()); + assertEquals(444, ((IntValue) values[1]).getValue()); + assertEquals(888, ((IntValue) values[2]).getValue()); + + values = format.nextRecord(values); + assertNotNull(values); + assertEquals(000, ((IntValue) values[0]).getValue()); + assertEquals(777, ((IntValue) values[1]).getValue()); + assertEquals(333, ((IntValue) values[2]).getValue()); + + values = format.nextRecord(values); + assertNotNull(values); + assertEquals(555, ((IntValue) values[0]).getValue()); + assertEquals(111, ((IntValue) values[1]).getValue()); + assertEquals(777, ((IntValue) values[2]).getValue()); + + values = format.nextRecord(values); + assertNotNull(values); + assertEquals(22222, ((IntValue) values[0]).getValue()); + assertEquals(99999999, ((IntValue) values[1]).getValue()); + assertEquals(8, ((IntValue) values[2]).getValue()); + + assertNull(format.nextRecord(values)); + assertTrue(format.reachedEnd()); + } catch (Exception ex) { + System.err.println(ex.getMessage()); + ex.printStackTrace(); + fail("Test erroneous"); + } + } + @Test public void testReadTooShortInput() throws IOException { try { @@ -277,7 +334,7 @@ public class GenericCsvInputFormatTest { final FileInputSplit split = createTempFile(fileContent); final Configuration parameters = new Configuration(); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.setFieldTypesGeneric(IntValue.class, IntValue.class, IntValue.class, IntValue.class, IntValue.class); format.configure(parameters); @@ -305,7 +362,7 @@ public class GenericCsvInputFormatTest { final FileInputSplit split = createTempFile(fileContent); final Configuration parameters = new Configuration(); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.setFieldTypesGeneric(IntValue.class, IntValue.class, IntValue.class, IntValue.class, IntValue.class); format.setLenient(true); @@ -331,7 +388,7 @@ public class GenericCsvInputFormatTest { final Configuration parameters = new Configuration(); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.setFieldTypesGeneric(StringValue.class, IntValue.class, StringValue.class, IntValue.class); format.configure(parameters); @@ -362,7 +419,7 @@ public class GenericCsvInputFormatTest { final Configuration parameters = new Configuration(); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.setFieldTypesGeneric(StringValue.class, IntValue.class, StringValue.class, IntValue.class); format.setLenient(true); @@ -390,7 +447,7 @@ public class GenericCsvInputFormatTest { final Configuration parameters = new Configuration(); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.setFieldTypesGeneric(StringValue.class, null, IntValue.class); format.setLenient(true); @@ -417,7 +474,7 @@ public class GenericCsvInputFormatTest { final Configuration parameters = new Configuration(); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.setFieldTypesGeneric(StringValue.class, StringValue.class, StringValue.class); format.configure(parameters); @@ -460,7 +517,7 @@ public class GenericCsvInputFormatTest { final Configuration parameters = new Configuration(); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.setFieldTypesGeneric(IntValue.class, StringValue.class, IntValue.class, StringValue.class); format.setSkipFirstLineAsHeader(true); @@ -492,7 +549,7 @@ public class GenericCsvInputFormatTest { final Configuration parameters = new Configuration(); - format.setFieldDelimiter('|'); + format.setFieldDelimiter("|"); format.setFieldTypesGeneric(IntValue.class, StringValue.class, IntValue.class, StringValue.class); format.setSkipFirstLineAsHeader(true); @@ -516,7 +573,6 @@ public class GenericCsvInputFormatTest { fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); } } - private FileInputSplit createTempFile(String content) throws IOException { this.tempFile = File.createTempFile("test_contents", "tmp"); @@ -558,4 +614,5 @@ public class GenericCsvInputFormatTest { return parseRecord(target, bytes, offset, numBytes) ? target : null; } } + } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java index ff7f473..37d6903 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java @@ -28,14 +28,14 @@ public class ByteParserTest extends ParserTestBase<Byte> { @Override public String[] getValidTestValues() { return new String[] { - "0", "1", "76", "-66", String.valueOf(Byte.MAX_VALUE), String.valueOf(Byte.MIN_VALUE) + "0", "1", "76", "-66", String.valueOf(Byte.MAX_VALUE), String.valueOf(Byte.MIN_VALUE), "19" }; } @Override public Byte[] getValidTestResults() { return new Byte[] { - (byte) 0, (byte)1, (byte)76, (byte) -66, Byte.MAX_VALUE, Byte.MIN_VALUE + (byte) 0, (byte)1, (byte)76, (byte) -66, Byte.MAX_VALUE, Byte.MIN_VALUE, (byte)19 }; } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java index 3589f2a..a6c315a 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java @@ -29,7 +29,7 @@ public class ByteValueParserTest extends ParserTestBase<ByteValue> { @Override public String[] getValidTestValues() { return new String[] { - "0", "1", "76", "-66", String.valueOf(Byte.MAX_VALUE), String.valueOf(Byte.MIN_VALUE) + "0", "1", "76", "-66", String.valueOf(Byte.MAX_VALUE), String.valueOf(Byte.MIN_VALUE), "19" }; } @@ -37,7 +37,7 @@ public class ByteValueParserTest extends ParserTestBase<ByteValue> { public ByteValue[] getValidTestResults() { return new ByteValue[] { new ByteValue((byte) 0), new ByteValue((byte) 1), new ByteValue((byte) 76), new ByteValue((byte) -66), - new ByteValue(Byte.MAX_VALUE), new ByteValue(Byte.MIN_VALUE) + new ByteValue(Byte.MAX_VALUE), new ByteValue(Byte.MIN_VALUE), new ByteValue((byte) 19) }; } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java index 92f6b21..71e78a0 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java @@ -32,7 +32,7 @@ public class DoubleParserTest extends ParserTestBase<Double> { String.valueOf(Double.MAX_VALUE), String.valueOf(Double.MIN_VALUE), String.valueOf(Double.NEGATIVE_INFINITY), String.valueOf(Double.POSITIVE_INFINITY), String.valueOf(Double.NaN), - "1.234E2", "1.234e3", "1.234E-2" + "1.234E2", "1.234e3", "1.234E-2", "1239" }; } @@ -43,7 +43,7 @@ public class DoubleParserTest extends ParserTestBase<Double> { Double.MAX_VALUE, Double.MIN_VALUE, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY, Double.NaN, - 1.234E2, 1.234e3, 1.234E-2 + 1.234E2, 1.234e3, 1.234E-2, 1239d }; } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java index 21a3e03..120dfac 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java @@ -33,7 +33,7 @@ public class DoubleValueParserTest extends ParserTestBase<DoubleValue> { String.valueOf(Double.MAX_VALUE), String.valueOf(Double.MIN_VALUE), String.valueOf(Double.NEGATIVE_INFINITY), String.valueOf(Double.POSITIVE_INFINITY), String.valueOf(Double.NaN), - "1.234E2", "1.234e3", "1.234E-2" + "1.234E2", "1.234e3", "1.234E-2", "1239" }; } @@ -45,7 +45,7 @@ public class DoubleValueParserTest extends ParserTestBase<DoubleValue> { new DoubleValue(Double.MAX_VALUE), new DoubleValue(Double.MIN_VALUE), new DoubleValue(Double.NEGATIVE_INFINITY), new DoubleValue(Double.POSITIVE_INFINITY), new DoubleValue(Double.NaN), - new DoubleValue(1.234E2), new DoubleValue(1.234e3), new DoubleValue(1.234E-2) + new DoubleValue(1.234E2), new DoubleValue(1.234e3), new DoubleValue(1.234E-2), new DoubleValue(1239d) }; } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java index 888efca..3c450a5 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java @@ -32,7 +32,7 @@ public class FloatParserTest extends ParserTestBase<Float> { String.valueOf(Float.MAX_VALUE), String.valueOf(Float.MIN_VALUE), String.valueOf(Float.NEGATIVE_INFINITY), String.valueOf(Float.POSITIVE_INFINITY), String.valueOf(Float.NaN), - "1.234E2", "1.234e3", "1.234E-2" + "1.234E2", "1.234e3", "1.234E-2", "1239" }; } @@ -43,7 +43,7 @@ public class FloatParserTest extends ParserTestBase<Float> { Float.MAX_VALUE, Float.MIN_VALUE, Float.NEGATIVE_INFINITY, Float.POSITIVE_INFINITY, Float.NaN, - 1.234E2f, 1.234e3f, 1.234E-2f + 1.234E2f, 1.234e3f, 1.234E-2f, 1239f }; } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java index 06b4194..be5b5b8 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java @@ -33,7 +33,7 @@ public class FloatValueParserTest extends ParserTestBase<FloatValue> { String.valueOf(Float.MAX_VALUE), String.valueOf(Float.MIN_VALUE), String.valueOf(Float.NEGATIVE_INFINITY), String.valueOf(Float.POSITIVE_INFINITY), String.valueOf(Float.NaN), - "1.234E2", "1.234e3", "1.234E-2" + "1.234E2", "1.234e3", "1.234E-2", "1239" }; } @@ -45,7 +45,7 @@ public class FloatValueParserTest extends ParserTestBase<FloatValue> { new FloatValue(Float.MAX_VALUE), new FloatValue(Float.MIN_VALUE), new FloatValue(Float.NEGATIVE_INFINITY), new FloatValue(Float.POSITIVE_INFINITY), new FloatValue(Float.NaN), - new FloatValue(1.234E2f), new FloatValue(1.234e3f), new FloatValue(1.234E-2f) + new FloatValue(1.234E2f), new FloatValue(1.234e3f), new FloatValue(1.234E-2f), new FloatValue(1239f) }; } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java index 828eb71..6e1d4db 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java @@ -28,14 +28,14 @@ public class IntParserTest extends ParserTestBase<Integer> { @Override public String[] getValidTestValues() { return new String[] { - "0", "1", "576", "-877678", String.valueOf(Integer.MAX_VALUE), String.valueOf(Integer.MIN_VALUE) + "0", "1", "576", "-877678", String.valueOf(Integer.MAX_VALUE), String.valueOf(Integer.MIN_VALUE), "1239" }; } @Override public Integer[] getValidTestResults() { return new Integer[] { - 0, 1, 576, -877678, Integer.MAX_VALUE, Integer.MIN_VALUE + 0, 1, 576, -877678, Integer.MAX_VALUE, Integer.MIN_VALUE, 1239 }; } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java index 63aabee..e32f704 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java @@ -29,7 +29,7 @@ public class IntValueParserTest extends ParserTestBase<IntValue> { @Override public String[] getValidTestValues() { return new String[] { - "0", "1", "576", "-877678", String.valueOf(Integer.MAX_VALUE), String.valueOf(Integer.MIN_VALUE) + "0", "1", "576", "-877678", String.valueOf(Integer.MAX_VALUE), String.valueOf(Integer.MIN_VALUE), "1239" }; } @@ -37,7 +37,7 @@ public class IntValueParserTest extends ParserTestBase<IntValue> { public IntValue[] getValidTestResults() { return new IntValue[] { new IntValue(0), new IntValue(1), new IntValue(576), new IntValue(-877678), - new IntValue(Integer.MAX_VALUE), new IntValue(Integer.MIN_VALUE) + new IntValue(Integer.MAX_VALUE), new IntValue(Integer.MIN_VALUE), new IntValue(1239) }; } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java index da78d89..4dd116b 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java @@ -29,14 +29,15 @@ public class LongParserTest extends ParserTestBase<Long> { public String[] getValidTestValues() { return new String[] { "0", "1", "576", "-877678", String.valueOf(Integer.MAX_VALUE), String.valueOf(Integer.MIN_VALUE), - String.valueOf(Long.MAX_VALUE), String.valueOf(Long.MIN_VALUE), "7656" + String.valueOf(Long.MAX_VALUE), String.valueOf(Long.MIN_VALUE), "7656", "1239" }; } @Override public Long[] getValidTestResults() { return new Long[] { - 0L, 1L, 576L, -877678L, (long) Integer.MAX_VALUE, (long) Integer.MIN_VALUE, Long.MAX_VALUE, Long.MIN_VALUE, 7656L + 0L, 1L, 576L, -877678L, (long) Integer.MAX_VALUE, (long) Integer.MIN_VALUE, Long.MAX_VALUE, Long.MIN_VALUE, + 7656L, 1239L }; } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java index 2b44cc0..fac6f42 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java @@ -30,7 +30,7 @@ public class LongValueParserTest extends ParserTestBase<LongValue> { public String[] getValidTestValues() { return new String[] { "0", "1", "576", "-877678", String.valueOf(Integer.MAX_VALUE), String.valueOf(Integer.MIN_VALUE), - String.valueOf(Long.MAX_VALUE), String.valueOf(Long.MIN_VALUE), "7656" + String.valueOf(Long.MAX_VALUE), String.valueOf(Long.MIN_VALUE), "7656", "1239" }; } @@ -39,7 +39,7 @@ public class LongValueParserTest extends ParserTestBase<LongValue> { return new LongValue[] { new LongValue(0L), new LongValue(1L), new LongValue(576L), new LongValue(-877678L), new LongValue((long) Integer.MAX_VALUE), new LongValue((long) Integer.MIN_VALUE), - new LongValue(Long.MAX_VALUE), new LongValue(Long.MIN_VALUE), new LongValue(7656L) + new LongValue(Long.MAX_VALUE), new LongValue(Long.MIN_VALUE), new LongValue(7656L), new LongValue(1239L) }; } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java index 00423d5..fb56add 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java @@ -79,18 +79,34 @@ public abstract class ParserTestBase<T> { T[] results = getValidTestResults(); for (int i = 0; i < testValues.length; i++) { + + FieldParser<T> parser1 = getParser(); + FieldParser<T> parser2 = getParser(); + FieldParser<T> parser3 = getParser(); - FieldParser<T> parser = getParser(); - - byte[] bytes = testValues[i].getBytes(); - int numRead = parser.parseField(bytes, 0, bytes.length, '|', parser.createValue()); - - assertTrue("Parser declared the valid value " + testValues[i] + " as invalid.", numRead != -1); - assertEquals("Invalid number of bytes read returned.", bytes.length, numRead); - - T result = parser.getLastResult(); - - assertEquals("Parser parsed wrong.", results[i], result); + byte[] bytes1 = testValues[i].getBytes(); + byte[] bytes2 = testValues[i].getBytes(); + byte[] bytes3 = testValues[i].getBytes(); + + int numRead1 = parser1.parseField(bytes1, 0, bytes1.length, new byte[] {'|'}, parser1.createValue()); + int numRead2 = parser2.parseField(bytes2, 0, bytes2.length, new byte[] {'&', '&'}, parser2.createValue()); + int numRead3 = parser3.parseField(bytes3, 0, bytes3.length, new byte[] {'9','9','9'}, parser3.createValue()); + + assertTrue("Parser declared the valid value " + testValues[i] + " as invalid.", numRead1 != -1); + assertTrue("Parser declared the valid value " + testValues[i] + " as invalid.", numRead2 != -1); + assertTrue("Parser declared the valid value " + testValues[i] + " as invalid.", numRead3 != -1); + + assertEquals("Invalid number of bytes read returned.", bytes1.length, numRead1); + assertEquals("Invalid number of bytes read returned.", bytes2.length, numRead2); + assertEquals("Invalid number of bytes read returned.", bytes3.length, numRead3); + + T result1 = parser1.getLastResult(); + T result2 = parser2.getLastResult(); + T result3 = parser3.getLastResult(); + + assertEquals("Parser parsed wrong. "+testValues[i], results[i], result1); + assertEquals("Parser parsed wrong. "+testValues[i], results[i], result2); + assertEquals("Parser parsed wrong. "+testValues[i], results[i], result3); } } @@ -100,35 +116,75 @@ public abstract class ParserTestBase<T> { fail("Test erroneous: " + e.getMessage()); } } + + @Test + public void testValidStringInIsolationWithEndDelimiter() { + try { + String[] testValues = getValidTestValues(); + T[] results = getValidTestResults(); + + for (int i = 0; i < testValues.length; i++) { + + FieldParser<T> parser1 = getParser(); + FieldParser<T> parser2 = getParser(); + + String testVal1 = testValues[i] + "|"; + String testVal2 = testValues[i] + "&&&&"; + + byte[] bytes1 = testVal1.getBytes(); + byte[] bytes2 = testVal2.getBytes(); + + int numRead1 = parser1.parseField(bytes1, 0, bytes1.length, new byte[] {'|'}, parser1.createValue()); + int numRead2 = parser2.parseField(bytes2, 0, bytes2.length, new byte[] {'&', '&','&', '&'}, parser2.createValue()); + + assertTrue("Parser declared the valid value " + testValues[i] + " as invalid.", numRead1 != -1); + assertTrue("Parser declared the valid value " + testValues[i] + " as invalid.", numRead2 != -1); + + assertEquals("Invalid number of bytes read returned.", bytes1.length, numRead1); + assertEquals("Invalid number of bytes read returned.", bytes2.length, numRead2); + + T result1 = parser1.getLastResult(); + T result2 = parser2.getLastResult(); + + assertEquals("Parser parsed wrong.", results[i], result1); + assertEquals("Parser parsed wrong.", results[i], result2); + } + + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test erroneous: " + e.getMessage()); + } + } @Test public void testConcatenated() { try { String[] testValues = getValidTestValues(); T[] results = getValidTestResults(); - - byte[] allBytesWithDelimiter = concatenate(testValues, '|', true); - byte[] allBytesNoDelimiterEnd = concatenate(testValues, ',', false); - + byte[] allBytesWithDelimiter = concatenate(testValues, new char[] {'|'}, true); + byte[] allBytesNoDelimiterEnd = concatenate(testValues, new char[] {','}, false); + FieldParser<T> parser1 = getParser(); FieldParser<T> parser2 = getParser(); - + T val1 = parser1.createValue(); T val2 = parser2.createValue(); - + int pos1 = 0; int pos2 = 0; - + for (int i = 0; i < results.length; i++) { - pos1 = parser1.parseField(allBytesWithDelimiter, pos1, allBytesWithDelimiter.length, '|', val1); - pos2 = parser2.parseField(allBytesNoDelimiterEnd, pos2, allBytesNoDelimiterEnd.length, ',', val2); - + pos1 = parser1.parseField(allBytesWithDelimiter, pos1, allBytesWithDelimiter.length, new byte[] {'|'}, val1); + pos2 = parser2.parseField(allBytesNoDelimiterEnd, pos2, allBytesNoDelimiterEnd.length, new byte[] {','}, val2); + assertTrue("Parser declared the valid value " + testValues[i] + " as invalid.", pos1 != -1); assertTrue("Parser declared the valid value " + testValues[i] + " as invalid.", pos2 != -1); - + T result1 = parser1.getLastResult(); T result2 = parser2.getLastResult(); - + assertEquals("Parser parsed wrong.", results[i], result1); assertEquals("Parser parsed wrong.", results[i], result2); } @@ -139,6 +195,43 @@ public abstract class ParserTestBase<T> { fail("Test erroneous: " + e.getMessage()); } } + + @Test + public void testConcatenatedMultiCharDelimiter() { + try { + String[] testValues = getValidTestValues(); + T[] results = getValidTestResults(); + byte[] allBytesWithDelimiter = concatenate(testValues, new char[] {'&','&', '&', '&'}, true); + byte[] allBytesNoDelimiterEnd = concatenate(testValues, new char[] {'9', '9', '9'}, false); + + FieldParser<T> parser1 = getParser(); + FieldParser<T> parser2 = getParser(); + + T val1 = parser1.createValue(); + T val2 = parser2.createValue(); + + int pos1 = 0; + int pos2 = 0; + + for (int i = 0; i < results.length; i++) { + pos1 = parser1.parseField(allBytesWithDelimiter, pos1, allBytesWithDelimiter.length, new byte[] {'&','&','&','&'}, val1); + assertTrue("Parser declared the valid value " + testValues[i] + " as invalid.", pos1 != -1); + T result1 = parser1.getLastResult(); + assertEquals("Parser parsed wrong.", results[i], result1); + + pos2 = parser2.parseField(allBytesNoDelimiterEnd, pos2, allBytesNoDelimiterEnd.length, new byte[] {'9','9','9'}, val2); + assertTrue("Parser declared the valid value " + testValues[i] + " as invalid.", pos2 != -1); + T result2 = parser2.getLastResult(); + assertEquals("Parser parsed wrong.", results[i], result2); + + } + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test erroneous: " + e.getMessage()); + } + } @Test public void testInValidStringInIsolation() { @@ -150,7 +243,7 @@ public abstract class ParserTestBase<T> { FieldParser<T> parser = getParser(); byte[] bytes = testValues[i].getBytes(); - int numRead = parser.parseField(bytes, 0, bytes.length, '|', parser.createValue()); + int numRead = parser.parseField(bytes, 0, bytes.length, new byte[] {'|'}, parser.createValue()); assertTrue("Parser accepted the invalid value " + testValues[i] + ".", numRead == -1); } @@ -183,12 +276,12 @@ public abstract class ParserTestBase<T> { testLine[splitPoint] = invalid; System.arraycopy(validValues, splitPoint, testLine, splitPoint + 1, validValues.length - splitPoint); - byte[] bytes = concatenate(testLine, '%', true); + byte[] bytes = concatenate(testLine, new char[] {'%'}, true); // read the valid parts int pos = 0; for (int i = 0; i < splitPoint; i++) { - pos = parser.parseField(bytes, pos, bytes.length, '%', value); + pos = parser.parseField(bytes, pos, bytes.length, new byte[] {'%'}, value); assertTrue("Parser declared the valid value " + validValues[i] + " as invalid.", pos != -1); T result = parser.getLastResult(); @@ -196,7 +289,7 @@ public abstract class ParserTestBase<T> { } // fail on the invalid part - pos = parser.parseField(bytes, pos, bytes.length, '%', value); + pos = parser.parseField(bytes, pos, bytes.length, new byte[] {'%'}, value); assertTrue("Parser accepted the invalid value " + invalid + ".", pos == -1); } } @@ -279,14 +372,14 @@ public abstract class ParserTestBase<T> { } } - private static byte[] concatenate(String[] values, char delimiter, boolean delimiterAtEnd) { + private static byte[] concatenate(String[] values, char[] delimiter, boolean delimiterAtEnd) { int len = 0; for (String s : values) { - len += s.length() + 1; + len += s.length() + delimiter.length; } if (!delimiterAtEnd) { - len--; + len-=delimiter.length; } int currPos = 0; @@ -301,10 +394,12 @@ public abstract class ParserTestBase<T> { currPos += numBytes; if (delimiterAtEnd || i < values.length-1) { - result[currPos++] = (byte) delimiter; + for(int j = 0; j < delimiter.length; j++) + result[currPos++] = (byte) delimiter[j]; } } return result; } + } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java index d9feda0..3f4cd02 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java @@ -28,14 +28,14 @@ public class ShortParserTest extends ParserTestBase<Short> { @Override public String[] getValidTestValues() { return new String[] { - "0", "1", "576", "-8778", String.valueOf(Short.MAX_VALUE), String.valueOf(Short.MIN_VALUE) + "0", "1", "576", "-8778", String.valueOf(Short.MAX_VALUE), String.valueOf(Short.MIN_VALUE), "119" }; } @Override public Short[] getValidTestResults() { return new Short[] { - (short) 0, (short)1, (short)576, (short) -8778, Short.MAX_VALUE, Short.MIN_VALUE + (short) 0, (short)1, (short)576, (short) -8778, Short.MAX_VALUE, Short.MIN_VALUE, (short) 119 }; } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java index d7db017..44f1589 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java @@ -29,7 +29,7 @@ public class ShortValueParserTest extends ParserTestBase<ShortValue> { @Override public String[] getValidTestValues() { return new String[] { - "0", "1", "576", "-8778", String.valueOf(Short.MAX_VALUE), String.valueOf(Short.MIN_VALUE) + "0", "1", "576", "-8778", String.valueOf(Short.MAX_VALUE), String.valueOf(Short.MIN_VALUE), "1239" }; } @@ -37,7 +37,8 @@ public class ShortValueParserTest extends ParserTestBase<ShortValue> { public ShortValue[] getValidTestResults() { return new ShortValue[] { new ShortValue((short) 0), new ShortValue((short) 1), new ShortValue((short) 576), - new ShortValue((short) -8778), new ShortValue(Short.MAX_VALUE), new ShortValue(Short.MIN_VALUE) + new ShortValue((short) -8778), new ShortValue(Short.MAX_VALUE), new ShortValue(Short.MIN_VALUE), + new ShortValue((short)1239) }; } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java index 0d68730..702f985 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java @@ -29,19 +29,19 @@ public class StringParserTest extends ParserTestBase<String> { public String[] getValidTestValues() { return new String[] { "abcdefgh", "i", "jklmno", "\"abcdefgh\"", "\"i\"", "\"jklmno\"", - "\"ab,cde|fg\"", "\"hij|m|n|op\"", + "\"ab,cde|fg\"", "\"hij|m|n|op\"", "\"hij&&m&&n&&op\"", " \"abcdefgh\"", " \"i\"\t\t\t", "\t \t\"jklmno\" ", - " \" abcd \" \t " + " \" abcd \" \t ", "Hello9" }; } @Override public String[] getValidTestResults() { return new String[] { - "abcdefgh", "i", "jklmno", "abcdefgh", "i", "jklmno", - "ab,cde|fg", "hij|m|n|op", + "abcdefgh", "i", "jklmno", "abcdefgh", "i", "jklmno", + "ab,cde|fg", "hij|m|n|op", "hij&&m&&n&&op", "abcdefgh", "i", "jklmno", - " abcd " + " abcd ", "Hello9" }; } http://git-wip-us.apache.org/repos/asf/flink/blob/0548a93d/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java index 35332d1..66ad32d 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java @@ -30,9 +30,9 @@ public class StringValueParserTest extends ParserTestBase<StringValue> { public String[] getValidTestValues() { return new String[] { "abcdefgh", "i", "jklmno", "\"abcdefgh\"", "\"i\"", "\"jklmno\"", - "\"ab,cde|fg\"", "\"hij|m|n|op\"", + "\"ab,cde|fg\"", "\"hij|m|n|op\"", "\"hij&&m&&n&&op\"", " \"abcdefgh\"", " \"i\"\t\t\t", "\t \t\"jklmno\" ", - " \" abcd \" \t " + " \" abcd \" \t ", "Hello9" }; } @@ -41,9 +41,9 @@ public class StringValueParserTest extends ParserTestBase<StringValue> { return new StringValue[] { new StringValue("abcdefgh"), new StringValue("i"), new StringValue("jklmno"), new StringValue("abcdefgh"), new StringValue("i"), new StringValue("jklmno"), - new StringValue("ab,cde|fg"), new StringValue("hij|m|n|op"), + new StringValue("ab,cde|fg"), new StringValue("hij|m|n|op"), new StringValue("hij&&m&&n&&op"), new StringValue("abcdefgh"), new StringValue("i"), new StringValue("jklmno"), - new StringValue(" abcd ") + new StringValue(" abcd "), new StringValue("Hello9") }; }
