Repository: flink Updated Branches: refs/heads/master b51605686 -> 3507d59f9
[FLINK-4248] [core] [table] CsvTableSource does not support reading SqlTimeTypeInfo types This closes #2303. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3507d59f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3507d59f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3507d59f Branch: refs/heads/master Commit: 3507d59f969485dd735487e6bf3eb893b2e3d8ed Parents: b516056 Author: twalthr <[email protected]> Authored: Wed Jul 27 14:51:07 2016 +0200 Committer: twalthr <[email protected]> Committed: Thu Sep 22 11:09:42 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/types/parser/BigDecParser.java | 31 ++---- .../apache/flink/types/parser/BigIntParser.java | 44 +++----- .../apache/flink/types/parser/DoubleParser.java | 44 +++----- .../flink/types/parser/DoubleValueParser.java | 27 ++--- .../apache/flink/types/parser/FieldParser.java | 48 +++++++++ .../apache/flink/types/parser/FloatParser.java | 48 +++------ .../flink/types/parser/FloatValueParser.java | 27 ++--- .../flink/types/parser/SqlDateParser.java | 108 +++++++++++++++++++ .../flink/types/parser/SqlTimeParser.java | 102 ++++++++++++++++++ .../flink/types/parser/SqlTimestampParser.java | 108 +++++++++++++++++++ .../typeutils/base/SqlTimeComparatorTest.java | 2 +- .../typeutils/base/SqlTimeSerializerTest.java | 2 +- .../base/SqlTimestampComparatorTest.java | 6 +- .../base/SqlTimestampSerializerTest.java | 6 +- .../flink/types/parser/SqlDateParserTest.java | 64 +++++++++++ .../flink/types/parser/SqlTimeParserTest.java | 63 +++++++++++ .../types/parser/SqlTimestampParserTest.java | 69 ++++++++++++ .../runtime/io/RowCsvInputFormatTest.scala | 42 +++++++- 18 files changed, 675 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java index 46a07fa..9c9f57f 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java @@ -35,42 +35,27 @@ public class BigDecParser extends FieldParser<BigDecimal> { @Override public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, BigDecimal reusable) { - int i = startPos; - - final int delimLimit = limit - delimiter.length + 1; - - while (i < limit) { - if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { - if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_COLUMN); - return -1; - } - break; - } - i++; - } - - if (i > startPos && - (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) { - setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); + final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter); + if (endPos < 0) { return -1; } try { - final int length = i - startPos; + final int length = endPos - startPos; if (reuse == null || reuse.length < length) { reuse = new char[length]; } for (int j = 0; j < length; j++) { final byte b = bytes[startPos + j]; if ((b < '0' || b > '9') && b != '-' && b != '+' && b != '.' && b != 'E' && b != 'e') { - throw new NumberFormatException(); + setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); + return -1; } reuse[j] = (char) bytes[startPos + j]; } this.result = new BigDecimal(reuse, 0, length); - return (i == limit) ? limit : i + delimiter.length; + return (endPos == limit) ? limit : endPos + delimiter.length; } catch (NumberFormatException e) { setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); return -1; @@ -96,7 +81,7 @@ public class BigDecParser extends FieldParser<BigDecimal> { * @param startPos The offset to start the parsing. * @param length The length of the byte sequence (counting from the offset). * @return The parsed value. - * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text * represents not a correct number. */ public static final BigDecimal parseField(byte[] bytes, int startPos, int length) { @@ -113,7 +98,7 @@ public class BigDecParser extends FieldParser<BigDecimal> { * @param length The length of the byte sequence (counting from the offset). * @param delimiter The delimiter that terminates the field. * @return The parsed value. - * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text * represents not a correct number. */ public static final BigDecimal parseField(byte[] bytes, int startPos, int length, char delimiter) { http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java index 13361c1..11e459a 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java @@ -34,31 +34,21 @@ public class BigIntParser extends FieldParser<BigInteger> { @Override public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, BigInteger reusable) { - int i = startPos; - - final int delimLimit = limit - delimiter.length + 1; - - while (i < limit) { - if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { - if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_COLUMN); - return -1; - } - break; - } - i++; + final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter); + if (endPos < 0) { + return -1; } - if (i > startPos && - (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) { + if (endPos > startPos && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); return -1; } - String str = new String(bytes, startPos, i - startPos); + String str = new String(bytes, startPos, endPos - startPos); try { this.result = new BigInteger(str); - return (i == limit) ? limit : i + delimiter.length; + return (endPos == limit) ? limit : endPos + delimiter.length; } catch (NumberFormatException e) { setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); return -1; @@ -84,7 +74,7 @@ public class BigIntParser extends FieldParser<BigInteger> { * @param startPos The offset to start the parsing. * @param length The length of the byte sequence (counting from the offset). * @return The parsed value. - * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text * represents not a correct number. */ public static final BigInteger parseField(byte[] bytes, int startPos, int length) { @@ -101,26 +91,18 @@ public class BigIntParser extends FieldParser<BigInteger> { * @param length The length of the byte sequence (counting from the offset). * @param delimiter The delimiter that terminates the field. * @return The parsed value. - * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text * represents not a correct number. */ public static final BigInteger parseField(byte[] bytes, int startPos, int length, char delimiter) { - if (length <= 0) { - throw new NumberFormatException("Invalid input: Empty string"); - } - int i = 0; - final byte delByte = (byte) delimiter; - - while (i < length && bytes[startPos + i] != delByte) { - i++; - } + final int limitedLen = nextStringLength(bytes, startPos, length, delimiter); - if (i > 0 && - (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i - 1]))) { + if (limitedLen > 0 && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) { throw new NumberFormatException("There is leading or trailing whitespace in the numeric field."); } - String str = new String(bytes, startPos, i); + final String str = new String(bytes, startPos, limitedLen); return new BigInteger(str); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java index 8af496d..2474adf 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java @@ -33,31 +33,21 @@ public class DoubleParser extends FieldParser<Double> { @Override public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Double reusable) { - int i = startPos; - - final int delimLimit = limit - delimiter.length + 1; - - while (i < limit) { - if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { - if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_COLUMN); - return -1; - } - break; - } - i++; + final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter); + if (endPos < 0) { + return -1; } - if (i > startPos && - (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) { + if (endPos > startPos && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); return -1; } - String str = new String(bytes, startPos, i - startPos); + String str = new String(bytes, startPos, endPos - startPos); try { this.result = Double.parseDouble(str); - return (i == limit) ? limit : i + delimiter.length; + return (endPos == limit) ? limit : endPos + delimiter.length; } catch (NumberFormatException e) { setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); return -1; @@ -83,7 +73,7 @@ public class DoubleParser extends FieldParser<Double> { * @param startPos The offset to start the parsing. * @param length The length of the byte sequence (counting from the offset). * @return The parsed value. - * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text * represents not a correct number. */ public static final double parseField(byte[] bytes, int startPos, int length) { @@ -100,26 +90,18 @@ public class DoubleParser extends FieldParser<Double> { * @param length The length of the byte sequence (counting from the offset). * @param delimiter The delimiter that terminates the field. * @return The parsed value. - * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text * represents not a correct number. */ public static final double parseField(byte[] bytes, int startPos, int length, char delimiter) { - if (length <= 0) { - throw new NumberFormatException("Invalid input: Empty string"); - } - int i = 0; - final byte delByte = (byte) delimiter; - - while (i < length && bytes[startPos + i] != delByte) { - i++; - } + final int limitedLen = nextStringLength(bytes, startPos, length, delimiter); - if (i > 0 && - (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i - 1]))) { + if (limitedLen > 0 && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) { throw new NumberFormatException("There is leading or trailing whitespace in the numeric field."); } - String str = new String(bytes, startPos, i); + final String str = new String(bytes, startPos, limitedLen); return Double.parseDouble(str); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java index 5c657be..10b43c3 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java @@ -32,34 +32,23 @@ public class DoubleValueParser extends FieldParser<DoubleValue> { @Override public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, DoubleValue reusable) { - - int i = startPos; - - final int delimLimit = limit - delimiter.length + 1; - - while (i < limit) { - if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { - if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_COLUMN); - return -1; - } - break; - } - i++; + final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter); + if (endPos < 0) { + return -1; } - - if (i > startPos && - (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[i - 1]))) { + + if (endPos > startPos && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); return -1; } - String str = new String(bytes, startPos, i - startPos); + String str = new String(bytes, startPos, endPos - startPos); try { double value = Double.parseDouble(str); reusable.setValue(value); this.result = reusable; - return (i == limit) ? limit : i + delimiter.length; + return (endPos == limit) ? limit : endPos + delimiter.length; } catch (NumberFormatException e) { setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index a1b9c5f..200d239 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -174,6 +174,49 @@ public abstract class FieldParser<T> { public ParseErrorState getErrorState() { return this.errorState; } + + /** + * Returns the end position of a string. Sets the error state if the column is empty. + * + * @return the end position of the string or -1 if an error occurred + */ + protected final int nextStringEndPos(byte[] bytes, int startPos, int limit, byte[] delimiter) { + int endPos = startPos; + + final int delimLimit = limit - delimiter.length + 1; + + while (endPos < limit) { + if (endPos < delimLimit && delimiterNext(bytes, endPos, delimiter)) { + if (endPos == startPos) { + setErrorState(ParseErrorState.EMPTY_COLUMN); + return -1; + } + break; + } + endPos++; + } + + return endPos; + } + + /** + * Returns the length of a string. Throws an exception if the column is empty. + * + * @return the length of the string + */ + protected static final int nextStringLength(byte[] bytes, int startPos, int length, char delimiter) { + if (length <= 0) { + throw new IllegalArgumentException("Invalid input: Empty string"); + } + int limitedLength = 0; + final byte delByte = (byte) delimiter; + + while (limitedLength < length && bytes[startPos + limitedLength] != delByte) { + limitedLength++; + } + + return limitedLength; + } // -------------------------------------------------------------------------------------------- // Mapping from types to parsers @@ -222,5 +265,10 @@ public abstract class FieldParser<T> { PARSERS.put(FloatValue.class, FloatValueParser.class); PARSERS.put(DoubleValue.class, DoubleValueParser.class); PARSERS.put(BooleanValue.class, BooleanValueParser.class); + + // SQL date/time types + PARSERS.put(java.sql.Time.class, SqlTimeParser.class); + PARSERS.put(java.sql.Date.class, SqlDateParser.class); + PARSERS.put(java.sql.Timestamp.class, SqlTimestampParser.class); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java index 3304f24..e76484e 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java @@ -30,34 +30,22 @@ public class FloatParser extends FieldParser<Float> { private float result; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Float - reusable) { - - int i = startPos; - - final int delimLimit = limit - delimiter.length + 1; - - while (i < limit) { - if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { - if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_COLUMN); - return -1; - } - break; - } - i++; + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Float reusable) { + final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter); + if (endPos < 0) { + return -1; } - if (i > startPos && - (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[i - 1]))) { + if (endPos > startPos && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[endPos - 1]))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); return -1; } - String str = new String(bytes, startPos, i - startPos); + String str = new String(bytes, startPos, endPos - startPos); try { this.result = Float.parseFloat(str); - return (i == limit) ? limit : i + delimiter.length; + return (endPos == limit) ? limit : endPos + delimiter.length; } catch (NumberFormatException e) { setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); return -1; @@ -83,7 +71,7 @@ public class FloatParser extends FieldParser<Float> { * @param startPos The offset to start the parsing. * @param length The length of the byte sequence (counting from the offset). * @return The parsed value. - * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text * represents not a correct number. */ public static final float parseField(byte[] bytes, int startPos, int length) { @@ -100,26 +88,18 @@ public class FloatParser extends FieldParser<Float> { * @param length The length of the byte sequence (counting from the offset). * @param delimiter The delimiter that terminates the field. * @return The parsed value. - * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text * represents not a correct number. */ public static final float parseField(byte[] bytes, int startPos, int length, char delimiter) { - if (length <= 0) { - throw new NumberFormatException("Invalid input: Empty string"); - } - int i = 0; - final byte delByte = (byte) delimiter; - - while (i < length && bytes[startPos + i] != delByte) { - i++; - } + final int limitedLen = nextStringLength(bytes, startPos, length, delimiter); - if (i > 0 && - (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i - 1]))) { + if (limitedLen > 0 && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) { throw new NumberFormatException("There is leading or trailing whitespace in the numeric field."); } - String str = new String(bytes, startPos, i); + final String str = new String(bytes, startPos, limitedLen); return Float.parseFloat(str); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java index 26ee47b..a834f22 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java @@ -32,34 +32,23 @@ public class FloatValueParser extends FieldParser<FloatValue> { @Override public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, FloatValue reusable) { - - int i = startPos; - - final int delimLimit = limit - delimiter.length + 1; - - while (i < limit) { - if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { - if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_COLUMN); - return -1; - } - break; - } - i++; + final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter); + if (endPos < 0) { + return -1; } - - if (i > startPos && - (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[i - 1]))) { + + if (endPos > startPos && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[endPos - 1]))) { setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); return -1; } - String str = new String(bytes, startPos, i - startPos); + String str = new String(bytes, startPos, endPos - startPos); try { float value = Float.parseFloat(str); reusable.setValue(value); this.result = reusable; - return (i == limit) ? limit : i + delimiter.length; + return (endPos == limit) ? limit : endPos + delimiter.length; } catch (NumberFormatException e) { setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java new file mode 100644 index 0000000..859dcf8 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + +import java.sql.Date; +import org.apache.flink.annotation.PublicEvolving; + +/** + * Parses a text field into a {@link java.sql.Date}. + */ +@PublicEvolving +public class SqlDateParser extends FieldParser<Date> { + + private static final Date DATE_INSTANCE = new Date(0L); + + private Date result; + + @Override + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Date reusable) { + final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter); + if (endPos < 0) { + return -1; + } + + if (endPos > startPos && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) { + setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); + return -1; + } + + String str = new String(bytes, startPos, endPos - startPos); + try { + this.result = Date.valueOf(str); + return (endPos == limit) ? limit : endPos + delimiter.length; + } catch (IllegalArgumentException e) { + setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); + return -1; + } + } + + @Override + public Date createValue() { + return DATE_INSTANCE; + } + + @Override + public Date getLastResult() { + return this.result; + } + + /** + * Static utility to parse a field of type Date from a byte sequence that represents text + * characters + * (such as when read from a file stream). + * + * @param bytes The bytes containing the text data that should be parsed. + * @param startPos The offset to start the parsing. + * @param length The length of the byte sequence (counting from the offset). + * @return The parsed value. + * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text + * represents not a correct number. + */ + public static final Date parseField(byte[] bytes, int startPos, int length) { + return parseField(bytes, startPos, length, (char) 0xffff); + } + + /** + * Static utility to parse a field of type Date from a byte sequence that represents text + * characters + * (such as when read from a file stream). + * + * @param bytes The bytes containing the text data that should be parsed. + * @param startPos The offset to start the parsing. + * @param length The length of the byte sequence (counting from the offset). + * @param delimiter The delimiter that terminates the field. + * @return The parsed value. + * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text + * represents not a correct number. + */ + public static final Date parseField(byte[] bytes, int startPos, int length, char delimiter) { + final int limitedLen = nextStringLength(bytes, startPos, length, delimiter); + + if (limitedLen > 0 && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) { + throw new NumberFormatException("There is leading or trailing whitespace in the numeric field."); + } + + final String str = new String(bytes, startPos, limitedLen); + return Date.valueOf(str); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java new file mode 100644 index 0000000..fbddadc --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + +import java.sql.Time; +import org.apache.flink.annotation.PublicEvolving; + +/** + * Parses a text field into a {@link Time}. + */ +@PublicEvolving +public class SqlTimeParser extends FieldParser<Time> { + + private static final Time TIME_INSTANCE = new Time(0L); + + private Time result; + + @Override + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Time reusable) { + final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter); + if (endPos < 0) { + return -1; + } + + String str = new String(bytes, startPos, endPos - startPos); + try { + this.result = Time.valueOf(str); + return (endPos == limit) ? limit : endPos + delimiter.length; + } catch (IllegalArgumentException e) { + setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); + return -1; + } + } + + @Override + public Time createValue() { + return TIME_INSTANCE; + } + + @Override + public Time getLastResult() { + return this.result; + } + + /** + * Static utility to parse a field of type Time from a byte sequence that represents text + * characters + * (such as when read from a file stream). + * + * @param bytes The bytes containing the text data that should be parsed. + * @param startPos The offset to start the parsing. + * @param length The length of the byte sequence (counting from the offset). + * @return The parsed value. + * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text + * represents not a correct number. + */ + public static final Time parseField(byte[] bytes, int startPos, int length) { + return parseField(bytes, startPos, length, (char) 0xffff); + } + + /** + * Static utility to parse a field of type Time from a byte sequence that represents text + * characters + * (such as when read from a file stream). + * + * @param bytes The bytes containing the text data that should be parsed. + * @param startPos The offset to start the parsing. + * @param length The length of the byte sequence (counting from the offset). + * @param delimiter The delimiter that terminates the field. + * @return The parsed value. + * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text + * represents not a correct number. + */ + public static final Time parseField(byte[] bytes, int startPos, int length, char delimiter) { + final int limitedLen = nextStringLength(bytes, startPos, length, delimiter); + + if (limitedLen > 0 && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) { + throw new NumberFormatException("There is leading or trailing whitespace in the numeric field."); + } + + final String str = new String(bytes, startPos, limitedLen); + return Time.valueOf(str); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java new file mode 100644 index 0000000..0bcb602 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + +import java.sql.Timestamp; +import org.apache.flink.annotation.PublicEvolving; + +/** + * Parses a text field into a {@link Timestamp}. + */ +@PublicEvolving +public class SqlTimestampParser extends FieldParser<Timestamp> { + + private static final Timestamp TIMESTAMP_INSTANCE = new Timestamp(0L); + + private Timestamp result; + + @Override + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Timestamp reusable) { + final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter); + if (endPos < 0) { + return -1; + } + + if (endPos > startPos && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) { + setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); + return -1; + } + + String str = new String(bytes, startPos, endPos - startPos); + try { + this.result = Timestamp.valueOf(str); + return (endPos == limit) ? limit : endPos + delimiter.length; + } catch (IllegalArgumentException e) { + setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); + return -1; + } + } + + @Override + public Timestamp createValue() { + return TIMESTAMP_INSTANCE; + } + + @Override + public Timestamp getLastResult() { + return this.result; + } + + /** + * Static utility to parse a field of type Timestamp from a byte sequence that represents text + * characters + * (such as when read from a file stream). + * + * @param bytes The bytes containing the text data that should be parsed. + * @param startPos The offset to start the parsing. + * @param length The length of the byte sequence (counting from the offset). + * @return The parsed value. + * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text + * represents not a correct number. + */ + public static final Timestamp parseField(byte[] bytes, int startPos, int length) { + return parseField(bytes, startPos, length, (char) 0xffff); + } + + /** + * Static utility to parse a field of type Timestamp from a byte sequence that represents text + * characters + * (such as when read from a file stream). + * + * @param bytes The bytes containing the text data that should be parsed. + * @param startPos The offset to start the parsing. + * @param length The length of the byte sequence (counting from the offset). + * @param delimiter The delimiter that terminates the field. + * @return The parsed value. + * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text + * represents not a correct number. + */ + public static final Timestamp parseField(byte[] bytes, int startPos, int length, char delimiter) { + final int limitedLen = nextStringLength(bytes, startPos, length, delimiter); + + if (limitedLen > 0 && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) { + throw new NumberFormatException("There is leading or trailing whitespace in the numeric field."); + } + + final String str = new String(bytes, startPos, limitedLen); + return Timestamp.valueOf(str); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java index 2b5cfdf..8fb3319 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java @@ -40,7 +40,7 @@ public class SqlTimeComparatorTest extends ComparatorTestBase<Time> { protected Time[] getSortedTestData() { return new Time[] { Time.valueOf("00:00:00"), - Time.valueOf("02:42:85"), + Time.valueOf("02:42:25"), Time.valueOf("14:15:59"), Time.valueOf("18:00:45") }; http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java index 4d16050..bfac789 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java @@ -47,7 +47,7 @@ public class SqlTimeSerializerTest extends SerializerTestBase<Time> { return new Time[] { new Time(0L), Time.valueOf("00:00:00"), - Time.valueOf("02:42:85"), + Time.valueOf("02:42:25"), Time.valueOf("14:15:59"), Time.valueOf("18:00:45") }; http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java index 0b8d294..e182d0a 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java @@ -40,9 +40,9 @@ public class SqlTimestampComparatorTest extends ComparatorTestBase<Timestamp> { protected Timestamp[] getSortedTestData() { return new Timestamp[] { Timestamp.valueOf("1970-01-01 00:00:00.000"), - Timestamp.valueOf("1990-10-14 02:42:85.123"), - Timestamp.valueOf("1990-10-14 02:42:85.123000001"), - Timestamp.valueOf("1990-10-14 02:42:85.123000002"), + Timestamp.valueOf("1990-10-14 02:42:25.123"), + Timestamp.valueOf("1990-10-14 02:42:25.123000001"), + Timestamp.valueOf("1990-10-14 02:42:25.123000002"), Timestamp.valueOf("2013-08-12 14:15:59.478"), Timestamp.valueOf("2013-08-12 14:15:59.479"), Timestamp.valueOf("2040-05-12 18:00:45.999") http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java index 70172d5..e825eaa 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java @@ -47,9 +47,9 @@ public class SqlTimestampSerializerTest extends SerializerTestBase<Timestamp> { return new Timestamp[] { new Timestamp(0L), Timestamp.valueOf("1970-01-01 00:00:00.000"), - Timestamp.valueOf("1990-10-14 02:42:85.123"), - Timestamp.valueOf("1990-10-14 02:42:85.123000001"), - Timestamp.valueOf("1990-10-14 02:42:85.123000002"), + Timestamp.valueOf("1990-10-14 02:42:25.123"), + Timestamp.valueOf("1990-10-14 02:42:25.123000001"), + Timestamp.valueOf("1990-10-14 02:42:25.123000002"), Timestamp.valueOf("2013-08-12 14:15:59.478"), Timestamp.valueOf("2013-08-12 14:15:59.479"), Timestamp.valueOf("2040-05-12 18:00:45.999") http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java new file mode 100644 index 0000000..25015cd --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + + +import java.sql.Date; + +public class SqlDateParserTest extends ParserTestBase<Date> { + + @Override + public String[] getValidTestValues() { + return new String[] { + "1970-01-01", "1990-10-14", "2013-08-12", "2040-05-12", "2040-5-12", "1970-1-1", + }; + } + + @Override + public Date[] getValidTestResults() { + return new Date[] { + Date.valueOf("1970-01-01"), Date.valueOf("1990-10-14"), Date.valueOf("2013-08-12"), + Date.valueOf("2040-05-12"), Date.valueOf("2040-05-12"), Date.valueOf("1970-01-01") + }; + } + + @Override + public String[] getInvalidTestValues() { + return new String[] { + " 2013-08-12", "2013-08-12 ", "2013-08--12", "13-08-12", "2013/08/12", " ", "\t", + "2013-XX-XX", "2000-02-35" + }; + } + + @Override + public boolean allowsEmptyField() { + return false; + } + + @Override + public FieldParser<Date> getParser() { + return new SqlDateParser(); + } + + @Override + public Class<Date> getTypeClass() { + return Date.class; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java new file mode 100644 index 0000000..06ebd3d --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + + +import java.sql.Time; + +public class SqlTimeParserTest extends ParserTestBase<Time> { + + @Override + public String[] getValidTestValues() { + return new String[] { + "00:00:00", "02:42:25", "14:15:51", "18:00:45", "23:59:58", "0:0:0" + }; + } + + @Override + public Time[] getValidTestResults() { + return new Time[] { + Time.valueOf("00:00:00"), Time.valueOf("02:42:25"), Time.valueOf("14:15:51"), + Time.valueOf("18:00:45"), Time.valueOf("23:59:58"), Time.valueOf("0:0:0") + }; + } + + @Override + public String[] getInvalidTestValues() { + return new String[] { + " 00:00:00", "00:00:00 ", "00:00::00", "00x00:00", "2013/08/12", " ", "\t" + }; + } + + @Override + public boolean allowsEmptyField() { + return false; + } + + @Override + public FieldParser<Time> getParser() { + return new SqlTimeParser(); + } + + @Override + public Class<Time> getTypeClass() { + return Time.class; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java new file mode 100644 index 0000000..0527606 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + + +import java.sql.Timestamp; + +public class SqlTimestampParserTest extends ParserTestBase<Timestamp> { + + @Override + public String[] getValidTestValues() { + return new String[] { + "1970-01-01 00:00:00.000", "1990-10-14 02:42:25", "1990-10-14 02:42:25.123", "1990-10-14 02:42:25.123000001", + "1990-10-14 02:42:25.123000002", "2013-08-12 14:15:59.478", "2013-08-12 14:15:59.47", + "0000-01-01 00:00:00.000", + }; + } + + @Override + public Timestamp[] getValidTestResults() { + return new Timestamp[] { + Timestamp.valueOf("1970-01-01 00:00:00.000"), Timestamp.valueOf("1990-10-14 02:42:25"), Timestamp.valueOf("1990-10-14 02:42:25.123"), + Timestamp.valueOf("1990-10-14 02:42:25.123000001"), Timestamp.valueOf("1990-10-14 02:42:25.123000002"), + Timestamp.valueOf("2013-08-12 14:15:59.478"), Timestamp.valueOf("2013-08-12 14:15:59.47"), + Timestamp.valueOf("0000-01-01 00:00:00.000") + }; + } + + @Override + public String[] getInvalidTestValues() { + return new String[] { + " 2013-08-12 14:15:59.479", "2013-08-12 14:15:59.479 ", "1970-01-01 00:00::00", + "00x00:00", "2013/08/12", "0000-01-01 00:00:00.f00", "2013-08-12 14:15:59.4788888888888888", + " ", "\t" + }; + } + + @Override + public boolean allowsEmptyField() { + return false; + } + + @Override + public FieldParser<Timestamp> getParser() { + return new SqlTimestampParser(); + } + + @Override + public Class<Timestamp> getTypeClass() { + return Timestamp.class; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala index db01b69..d176b79 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala @@ -20,9 +20,10 @@ package org.apache.flink.api.table.runtime.io import java.io.{File, FileOutputStream, OutputStreamWriter} import java.nio.charset.StandardCharsets +import java.sql.{Date, Time, Timestamp} import org.apache.flink.api.common.io.ParseException -import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} import org.apache.flink.api.table.Row import org.apache.flink.api.table.runtime.io.RowCsvInputFormatTest.{PATH, createTempFile, testRemovingTrailingCR} import org.apache.flink.api.table.typeutils.RowTypeInfo @@ -786,6 +787,45 @@ class RowCsvInputFormatTest { assertEquals("\\\"Hello\\\" World", record.productElement(0)) assertEquals("We are\\\" young", record.productElement(1)) } + + @Test + def testSqlTimeFields() { + val fileContent = "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5\n" + + "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5.3\n" + + val split = createTempFile(fileContent) + + val typeInfo = new RowTypeInfo(Seq( + SqlTimeTypeInfo.DATE, + SqlTimeTypeInfo.TIME, + SqlTimeTypeInfo.TIMESTAMP, + SqlTimeTypeInfo.TIMESTAMP)) + + val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo) + format.setFieldDelimiter("|") + format.configure(new Configuration) + format.open(split) + + var result = new Row(4) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals(Date.valueOf("1990-10-14"), result.productElement(0)) + assertEquals(Time.valueOf("02:42:25"), result.productElement(1)) + assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.productElement(2)) + assertEquals(Timestamp.valueOf("1990-01-04 02:02:05"), result.productElement(3)) + + result = format.nextRecord(result) + assertNotNull(result) + assertEquals(Date.valueOf("1990-10-14"), result.productElement(0)) + assertEquals(Time.valueOf("02:42:25"), result.productElement(1)) + assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.productElement(2)) + assertEquals(Timestamp.valueOf("1990-01-04 02:02:05.3"), result.productElement(3)) + + result = format.nextRecord(result) + assertNull(result) + assertTrue(format.reachedEnd) + } } object RowCsvInputFormatTest {
