Repository: flink
Updated Branches:
  refs/heads/master b51605686 -> 3507d59f9


[FLINK-4248] [core] [table] CsvTableSource does not support reading 
SqlTimeTypeInfo types

This closes #2303.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3507d59f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3507d59f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3507d59f

Branch: refs/heads/master
Commit: 3507d59f969485dd735487e6bf3eb893b2e3d8ed
Parents: b516056
Author: twalthr <twal...@apache.org>
Authored: Wed Jul 27 14:51:07 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Thu Sep 22 11:09:42 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/types/parser/BigDecParser.java |  31 ++----
 .../apache/flink/types/parser/BigIntParser.java |  44 +++-----
 .../apache/flink/types/parser/DoubleParser.java |  44 +++-----
 .../flink/types/parser/DoubleValueParser.java   |  27 ++---
 .../apache/flink/types/parser/FieldParser.java  |  48 +++++++++
 .../apache/flink/types/parser/FloatParser.java  |  48 +++------
 .../flink/types/parser/FloatValueParser.java    |  27 ++---
 .../flink/types/parser/SqlDateParser.java       | 108 +++++++++++++++++++
 .../flink/types/parser/SqlTimeParser.java       | 102 ++++++++++++++++++
 .../flink/types/parser/SqlTimestampParser.java  | 108 +++++++++++++++++++
 .../typeutils/base/SqlTimeComparatorTest.java   |   2 +-
 .../typeutils/base/SqlTimeSerializerTest.java   |   2 +-
 .../base/SqlTimestampComparatorTest.java        |   6 +-
 .../base/SqlTimestampSerializerTest.java        |   6 +-
 .../flink/types/parser/SqlDateParserTest.java   |  64 +++++++++++
 .../flink/types/parser/SqlTimeParserTest.java   |  63 +++++++++++
 .../types/parser/SqlTimestampParserTest.java    |  69 ++++++++++++
 .../runtime/io/RowCsvInputFormatTest.scala      |  42 +++++++-
 18 files changed, 675 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
index 46a07fa..9c9f57f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
@@ -35,42 +35,27 @@ public class BigDecParser extends FieldParser<BigDecimal> {
 
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, BigDecimal reusable) {
-               int i = startPos;
-
-               final int delimLimit = limit - delimiter.length + 1;
-
-               while (i < limit) {
-                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
-                               if (i == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
-                                       return -1;
-                               }
-                               break;
-                       }
-                       i++;
-               }
-
-               if (i > startPos &&
-                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(i - 1)]))) {
-                       
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+               final int endPos = nextStringEndPos(bytes, startPos, limit, 
delimiter);
+               if (endPos < 0) {
                        return -1;
                }
 
                try {
-                       final int length = i - startPos;
+                       final int length = endPos - startPos;
                        if (reuse == null || reuse.length < length) {
                                reuse = new char[length];
                        }
                        for (int j = 0; j < length; j++) {
                                final byte b = bytes[startPos + j];
                                if ((b < '0' || b > '9') && b != '-' && b != 
'+' && b != '.' && b != 'E' && b != 'e') {
-                                       throw new NumberFormatException();
+                                       
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+                                       return -1;
                                }
                                reuse[j] = (char) bytes[startPos + j];
                        }
 
                        this.result = new BigDecimal(reuse, 0, length);
-                       return (i == limit) ? limit : i + delimiter.length;
+                       return (endPos == limit) ? limit : endPos + 
delimiter.length;
                } catch (NumberFormatException e) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
                        return -1;
@@ -96,7 +81,7 @@ public class BigDecParser extends FieldParser<BigDecimal> {
         * @param startPos The offset to start the parsing.
         * @param length   The length of the byte sequence (counting from the 
offset).
         * @return The parsed value.
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * @throws IllegalArgumentException Thrown when the value cannot be 
parsed because the text
         * represents not a correct number.
         */
        public static final BigDecimal parseField(byte[] bytes, int startPos, 
int length) {
@@ -113,7 +98,7 @@ public class BigDecParser extends FieldParser<BigDecimal> {
         * @param length    The length of the byte sequence (counting from the 
offset).
         * @param delimiter The delimiter that terminates the field.
         * @return The parsed value.
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * @throws IllegalArgumentException Thrown when the value cannot be 
parsed because the text
         * represents not a correct number.
         */
        public static final BigDecimal parseField(byte[] bytes, int startPos, 
int length, char delimiter) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
index 13361c1..11e459a 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
@@ -34,31 +34,21 @@ public class BigIntParser extends FieldParser<BigInteger> {
 
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, BigInteger reusable) {
-               int i = startPos;
-
-               final int delimLimit = limit - delimiter.length + 1;
-
-               while (i < limit) {
-                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
-                               if (i == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
-                                       return -1;
-                               }
-                               break;
-                       }
-                       i++;
+               final int endPos = nextStringEndPos(bytes, startPos, limit, 
delimiter);
+               if (endPos < 0) {
+                       return -1;
                }
 
-               if (i > startPos &&
-                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(i - 1)]))) {
+               if (endPos > startPos &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(endPos - 1)]))) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
                        return -1;
                }
 
-               String str = new String(bytes, startPos, i - startPos);
+               String str = new String(bytes, startPos, endPos - startPos);
                try {
                        this.result = new BigInteger(str);
-                       return (i == limit) ? limit : i + delimiter.length;
+                       return (endPos == limit) ? limit : endPos + 
delimiter.length;
                } catch (NumberFormatException e) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
                        return -1;
@@ -84,7 +74,7 @@ public class BigIntParser extends FieldParser<BigInteger> {
         * @param startPos The offset to start the parsing.
         * @param length   The length of the byte sequence (counting from the 
offset).
         * @return The parsed value.
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * @throws IllegalArgumentException Thrown when the value cannot be 
parsed because the text
         * represents not a correct number.
         */
        public static final BigInteger parseField(byte[] bytes, int startPos, 
int length) {
@@ -101,26 +91,18 @@ public class BigIntParser extends FieldParser<BigInteger> {
         * @param length    The length of the byte sequence (counting from the 
offset).
         * @param delimiter The delimiter that terminates the field.
         * @return The parsed value.
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * @throws IllegalArgumentException Thrown when the value cannot be 
parsed because the text
         * represents not a correct number.
         */
        public static final BigInteger parseField(byte[] bytes, int startPos, 
int length, char delimiter) {
-               if (length <= 0) {
-                       throw new NumberFormatException("Invalid input: Empty 
string");
-               }
-               int i = 0;
-               final byte delByte = (byte) delimiter;
-
-               while (i < length && bytes[startPos + i] != delByte) {
-                       i++;
-               }
+               final int limitedLen = nextStringLength(bytes, startPos, 
length, delimiter);
 
-               if (i > 0 &&
-                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[startPos + i - 1]))) {
+               if (limitedLen > 0 &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
                        throw new NumberFormatException("There is leading or 
trailing whitespace in the numeric field.");
                }
 
-               String str = new String(bytes, startPos, i);
+               final String str = new String(bytes, startPos, limitedLen);
                return new BigInteger(str);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
index 8af496d..2474adf 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
@@ -33,31 +33,21 @@ public class DoubleParser extends FieldParser<Double> {
 
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Double reusable) {
-               int i = startPos;
-
-               final int delimLimit = limit - delimiter.length + 1;
-
-               while (i < limit) {
-                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
-                               if (i == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
-                                       return -1;
-                               }
-                               break;
-                       }
-                       i++;
+               final int endPos = nextStringEndPos(bytes, startPos, limit, 
delimiter);
+               if (endPos < 0) {
+                       return -1;
                }
 
-               if (i > startPos &&
-                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(i - 1)]))) {
+               if (endPos > startPos &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(endPos - 1)]))) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
                        return -1;
                }
 
-               String str = new String(bytes, startPos, i - startPos);
+               String str = new String(bytes, startPos, endPos - startPos);
                try {
                        this.result = Double.parseDouble(str);
-                       return (i == limit) ? limit : i + delimiter.length;
+                       return (endPos == limit) ? limit : endPos + 
delimiter.length;
                } catch (NumberFormatException e) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
                        return -1;
@@ -83,7 +73,7 @@ public class DoubleParser extends FieldParser<Double> {
         * @param startPos The offset to start the parsing.
         * @param length   The length of the byte sequence (counting from the 
offset).
         * @return The parsed value.
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * @throws IllegalArgumentException Thrown when the value cannot be 
parsed because the text
         * represents not a correct number.
         */
        public static final double parseField(byte[] bytes, int startPos, int 
length) {
@@ -100,26 +90,18 @@ public class DoubleParser extends FieldParser<Double> {
         * @param length    The length of the byte sequence (counting from the 
offset).
         * @param delimiter The delimiter that terminates the field.
         * @return The parsed value.
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * @throws IllegalArgumentException Thrown when the value cannot be 
parsed because the text
         * represents not a correct number.
         */
        public static final double parseField(byte[] bytes, int startPos, int 
length, char delimiter) {
-               if (length <= 0) {
-                       throw new NumberFormatException("Invalid input: Empty 
string");
-               }
-               int i = 0;
-               final byte delByte = (byte) delimiter;
-
-               while (i < length && bytes[startPos + i] != delByte) {
-                       i++;
-               }
+               final int limitedLen = nextStringLength(bytes, startPos, 
length, delimiter);
 
-               if (i > 0 &&
-                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[startPos + i - 1]))) {
+               if (limitedLen > 0 &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
                        throw new NumberFormatException("There is leading or 
trailing whitespace in the numeric field.");
                }
 
-               String str = new String(bytes, startPos, i);
+               final String str = new String(bytes, startPos, limitedLen);
                return Double.parseDouble(str);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
index 5c657be..10b43c3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
@@ -32,34 +32,23 @@ public class DoubleValueParser extends 
FieldParser<DoubleValue> {
        
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, DoubleValue reusable) {
-               
-               int i = startPos;
-
-               final int delimLimit = limit - delimiter.length + 1;
-
-               while (i < limit) {
-                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
-                               if (i == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
-                                       return -1;
-                               }
-                               break;
-                       }
-                       i++;
+               final int endPos = nextStringEndPos(bytes, startPos, limit, 
delimiter);
+               if (endPos < 0) {
+                       return -1;
                }
-               
-               if (i > startPos &&
-                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[i - 1]))) {
+
+               if (endPos > startPos &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(endPos - 1)]))) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
                        return -1;
                }
 
-               String str = new String(bytes, startPos, i - startPos);
+               String str = new String(bytes, startPos, endPos - startPos);
                try {
                        double value = Double.parseDouble(str);
                        reusable.setValue(value);
                        this.result = reusable;
-                       return (i == limit) ? limit : i + delimiter.length;
+                       return (endPos == limit) ? limit : endPos + 
delimiter.length;
                }
                catch (NumberFormatException e) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index a1b9c5f..200d239 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -174,6 +174,49 @@ public abstract class FieldParser<T> {
        public ParseErrorState getErrorState() {
                return this.errorState;
        }
+
+       /**
+        * Returns the end position of a string. Sets the error state if the 
column is empty.
+        *
+        * @return the end position of the string or -1 if an error occurred
+        */
+       protected final int nextStringEndPos(byte[] bytes, int startPos, int 
limit, byte[] delimiter) {
+               int endPos = startPos;
+
+               final int delimLimit = limit - delimiter.length + 1;
+
+               while (endPos < limit) {
+                       if (endPos < delimLimit && delimiterNext(bytes, endPos, 
delimiter)) {
+                               if (endPos == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
+                                       return -1;
+                               }
+                               break;
+                       }
+                       endPos++;
+               }
+
+               return endPos;
+       }
+
+       /**
+        * Returns the length of a string. Throws an exception if the column is 
empty.
+        *
+        * @return the length of the string
+        */
+       protected static final int nextStringLength(byte[] bytes, int startPos, 
int length, char delimiter) {
+               if (length <= 0) {
+                       throw new IllegalArgumentException("Invalid input: 
Empty string");
+               }
+               int limitedLength = 0;
+               final byte delByte = (byte) delimiter;
+
+               while (limitedLength < length && bytes[startPos + 
limitedLength] != delByte) {
+                       limitedLength++;
+               }
+
+               return limitedLength;
+       }
        
        // 
--------------------------------------------------------------------------------------------
        //  Mapping from types to parsers
@@ -222,5 +265,10 @@ public abstract class FieldParser<T> {
                PARSERS.put(FloatValue.class, FloatValueParser.class);
                PARSERS.put(DoubleValue.class, DoubleValueParser.class);
                PARSERS.put(BooleanValue.class, BooleanValueParser.class);
+
+               // SQL date/time types
+               PARSERS.put(java.sql.Time.class, SqlTimeParser.class);
+               PARSERS.put(java.sql.Date.class, SqlDateParser.class);
+               PARSERS.put(java.sql.Timestamp.class, SqlTimestampParser.class);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
index 3304f24..e76484e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
@@ -30,34 +30,22 @@ public class FloatParser extends FieldParser<Float> {
        private float result;
        
        @Override
-       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Float 
-               reusable) {
-
-               int i = startPos;
-
-               final int delimLimit = limit - delimiter.length + 1;
-
-               while (i < limit) {
-                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
-                               if (i == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
-                                       return -1;
-                               }
-                               break;
-                       }
-                       i++;
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Float reusable) {
+               final int endPos = nextStringEndPos(bytes, startPos, limit, 
delimiter);
+               if (endPos < 0) {
+                       return -1;
                }
 
-               if (i > startPos &&
-                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[i - 1]))) {
+               if (endPos > startPos &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[endPos - 1]))) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
                        return -1;
                }
 
-               String str = new String(bytes, startPos, i - startPos);
+               String str = new String(bytes, startPos, endPos - startPos);
                try {
                        this.result = Float.parseFloat(str);
-                       return (i == limit) ? limit : i + delimiter.length;
+                       return (endPos == limit) ? limit : endPos + 
delimiter.length;
                } catch (NumberFormatException e) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
                        return -1;
@@ -83,7 +71,7 @@ public class FloatParser extends FieldParser<Float> {
         * @param startPos The offset to start the parsing.
         * @param length   The length of the byte sequence (counting from the 
offset).
         * @return The parsed value.
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * @throws IllegalArgumentException Thrown when the value cannot be 
parsed because the text
         * represents not a correct number.
         */
        public static final float parseField(byte[] bytes, int startPos, int 
length) {
@@ -100,26 +88,18 @@ public class FloatParser extends FieldParser<Float> {
         * @param length    The length of the byte sequence (counting from the 
offset).
         * @param delimiter The delimiter that terminates the field.
         * @return The parsed value.
-        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * @throws IllegalArgumentException Thrown when the value cannot be 
parsed because the text
         * represents not a correct number.
         */
        public static final float parseField(byte[] bytes, int startPos, int 
length, char delimiter) {
-               if (length <= 0) {
-                       throw new NumberFormatException("Invalid input: Empty 
string");
-               }
-               int i = 0;
-               final byte delByte = (byte) delimiter;
-
-               while (i < length && bytes[startPos + i] != delByte) {
-                       i++;
-               }
+               final int limitedLen = nextStringLength(bytes, startPos, 
length, delimiter);
 
-               if (i > 0 &&
-                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[startPos + i - 1]))) {
+               if (limitedLen > 0 &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
                        throw new NumberFormatException("There is leading or 
trailing whitespace in the numeric field.");
                }
 
-               String str = new String(bytes, startPos, i);
+               final String str = new String(bytes, startPos, limitedLen);
                return Float.parseFloat(str);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
index 26ee47b..a834f22 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
@@ -32,34 +32,23 @@ public class FloatValueParser extends 
FieldParser<FloatValue> {
        
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, FloatValue reusable) {
-               
-               int i = startPos;
-
-               final int delimLimit = limit - delimiter.length + 1;
-
-               while (i < limit) {
-                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
-                               if (i == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
-                                       return -1;
-                               }
-                               break;
-                       }
-                       i++;
+               final int endPos = nextStringEndPos(bytes, startPos, limit, 
delimiter);
+               if (endPos < 0) {
+                       return -1;
                }
-               
-               if (i > startPos &&
-                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[i - 1]))) {
+
+               if (endPos > startPos &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[endPos - 1]))) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
                        return -1;
                }
 
-               String str = new String(bytes, startPos, i - startPos);
+               String str = new String(bytes, startPos, endPos - startPos);
                try {
                        float value = Float.parseFloat(str);
                        reusable.setValue(value);
                        this.result = reusable;
-                       return (i == limit) ? limit : i + delimiter.length;
+                       return (endPos == limit) ? limit : endPos + 
delimiter.length;
                }
                catch (NumberFormatException e) {
                        
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
new file mode 100644
index 0000000..859dcf8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import java.sql.Date;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Parses a text field into a {@link java.sql.Date}.
+ */
+@PublicEvolving
+public class SqlDateParser extends FieldParser<Date> {
+
+       private static final Date DATE_INSTANCE = new Date(0L);
+
+       private Date result;
+
+       @Override
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Date reusable) {
+               final int endPos = nextStringEndPos(bytes, startPos, limit, 
delimiter);
+               if (endPos < 0) {
+                       return -1;
+               }
+
+               if (endPos > startPos &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(endPos - 1)]))) {
+                       
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+                       return -1;
+               }
+
+               String str = new String(bytes, startPos, endPos - startPos);
+               try {
+                       this.result = Date.valueOf(str);
+                       return (endPos == limit) ? limit : endPos + 
delimiter.length;
+               } catch (IllegalArgumentException e) {
+                       
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
+                       return -1;
+               }
+       }
+
+       @Override
+       public Date createValue() {
+               return DATE_INSTANCE;
+       }
+
+       @Override
+       public Date getLastResult() {
+               return this.result;
+       }
+
+       /**
+        * Static utility to parse a field of type Date from a byte sequence 
that represents text
+        * characters
+        * (such as when read from a file stream).
+        *
+        * @param bytes    The bytes containing the text data that should be 
parsed.
+        * @param startPos The offset to start the parsing.
+        * @param length   The length of the byte sequence (counting from the 
offset).
+        * @return The parsed value.
+        * @throws IllegalArgumentException Thrown when the value cannot be 
parsed because the text
+        * represents not a correct number.
+        */
+       public static final Date parseField(byte[] bytes, int startPos, int 
length) {
+               return parseField(bytes, startPos, length, (char) 0xffff);
+       }
+
+       /**
+        * Static utility to parse a field of type Date from a byte sequence 
that represents text
+        * characters
+        * (such as when read from a file stream).
+        *
+        * @param bytes     The bytes containing the text data that should be 
parsed.
+        * @param startPos  The offset to start the parsing.
+        * @param length    The length of the byte sequence (counting from the 
offset).
+        * @param delimiter The delimiter that terminates the field.
+        * @return The parsed value.
+        * @throws IllegalArgumentException Thrown when the value cannot be 
parsed because the text
+        * represents not a correct number.
+        */
+       public static final Date parseField(byte[] bytes, int startPos, int 
length, char delimiter) {
+               final int limitedLen = nextStringLength(bytes, startPos, 
length, delimiter);
+
+               if (limitedLen > 0 &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
+                       throw new NumberFormatException("There is leading or 
trailing whitespace in the numeric field.");
+               }
+
+               final String str = new String(bytes, startPos, limitedLen);
+               return Date.valueOf(str);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
new file mode 100644
index 0000000..fbddadc
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import java.sql.Time;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Parses a text field into a {@link Time}.
+ */
+@PublicEvolving
+public class SqlTimeParser extends FieldParser<Time> {
+
+       private static final Time TIME_INSTANCE = new Time(0L);
+
+       private Time result;
+
+       @Override
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Time reusable) {
+               final int endPos = nextStringEndPos(bytes, startPos, limit, 
delimiter);
+               if (endPos < 0) {
+                       return -1;
+               }
+
+               String str = new String(bytes, startPos, endPos - startPos);
+               try {
+                       this.result = Time.valueOf(str);
+                       return (endPos == limit) ? limit : endPos + 
delimiter.length;
+               } catch (IllegalArgumentException e) {
+                       
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
+                       return -1;
+               }
+       }
+
+       @Override
+       public Time createValue() {
+               return TIME_INSTANCE;
+       }
+
+       @Override
+       public Time getLastResult() {
+               return this.result;
+       }
+
+       /**
+        * Static utility to parse a field of type Time from a byte sequence 
that represents text
+        * characters
+        * (such as when read from a file stream).
+        *
+        * @param bytes    The bytes containing the text data that should be 
parsed.
+        * @param startPos The offset to start the parsing.
+        * @param length   The length of the byte sequence (counting from the 
offset).
+        * @return The parsed value.
+        * @throws IllegalArgumentException Thrown when the value cannot be 
parsed because the text
+        * represents not a correct number.
+        */
+       public static final Time parseField(byte[] bytes, int startPos, int 
length) {
+               return parseField(bytes, startPos, length, (char) 0xffff);
+       }
+
+       /**
+        * Static utility to parse a field of type Time from a byte sequence 
that represents text
+        * characters
+        * (such as when read from a file stream).
+        *
+        * @param bytes     The bytes containing the text data that should be 
parsed.
+        * @param startPos  The offset to start the parsing.
+        * @param length    The length of the byte sequence (counting from the 
offset).
+        * @param delimiter The delimiter that terminates the field.
+        * @return The parsed value.
+        * @throws IllegalArgumentException Thrown when the value cannot be 
parsed because the text
+        * represents not a correct number.
+        */
+       public static final Time parseField(byte[] bytes, int startPos, int 
length, char delimiter) {
+               final int limitedLen = nextStringLength(bytes, startPos, 
length, delimiter);
+
+               if (limitedLen > 0 &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
+                       throw new NumberFormatException("There is leading or 
trailing whitespace in the numeric field.");
+               }
+
+               final String str = new String(bytes, startPos, limitedLen);
+               return Time.valueOf(str);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
 
b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
new file mode 100644
index 0000000..0bcb602
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import java.sql.Timestamp;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Parses a text field into a {@link Timestamp}.
+ */
+@PublicEvolving
+public class SqlTimestampParser extends FieldParser<Timestamp> {
+
+       private static final Timestamp TIMESTAMP_INSTANCE = new Timestamp(0L);
+
+       private Timestamp result;
+
+       @Override
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, Timestamp reusable) {
+               final int endPos = nextStringEndPos(bytes, startPos, limit, 
delimiter);
+               if (endPos < 0) {
+                       return -1;
+               }
+
+               if (endPos > startPos &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(endPos - 1)]))) {
+                       
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+                       return -1;
+               }
+
+               String str = new String(bytes, startPos, endPos - startPos);
+               try {
+                       this.result = Timestamp.valueOf(str);
+                       return (endPos == limit) ? limit : endPos + 
delimiter.length;
+               } catch (IllegalArgumentException e) {
+                       
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
+                       return -1;
+               }
+       }
+
+       @Override
+       public Timestamp createValue() {
+               return TIMESTAMP_INSTANCE;
+       }
+
+       @Override
+       public Timestamp getLastResult() {
+               return this.result;
+       }
+
+       /**
+        * Static utility to parse a field of type Timestamp from a byte 
sequence that represents text
+        * characters
+        * (such as when read from a file stream).
+        *
+        * @param bytes    The bytes containing the text data that should be 
parsed.
+        * @param startPos The offset to start the parsing.
+        * @param length   The length of the byte sequence (counting from the 
offset).
+        * @return The parsed value.
+        * @throws IllegalArgumentException Thrown when the value cannot be 
parsed because the text
+        * represents not a correct number.
+        */
+       public static final Timestamp parseField(byte[] bytes, int startPos, 
int length) {
+               return parseField(bytes, startPos, length, (char) 0xffff);
+       }
+
+       /**
+        * Static utility to parse a field of type Timestamp from a byte 
sequence that represents text
+        * characters
+        * (such as when read from a file stream).
+        *
+        * @param bytes     The bytes containing the text data that should be 
parsed.
+        * @param startPos  The offset to start the parsing.
+        * @param length    The length of the byte sequence (counting from the 
offset).
+        * @param delimiter The delimiter that terminates the field.
+        * @return The parsed value.
+        * @throws IllegalArgumentException Thrown when the value cannot be 
parsed because the text
+        * represents not a correct number.
+        */
+       public static final Timestamp parseField(byte[] bytes, int startPos, 
int length, char delimiter) {
+               final int limitedLen = nextStringLength(bytes, startPos, 
length, delimiter);
+
+               if (limitedLen > 0 &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
+                       throw new NumberFormatException("There is leading or 
trailing whitespace in the numeric field.");
+               }
+
+               final String str = new String(bytes, startPos, limitedLen);
+               return Timestamp.valueOf(str);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
index 2b5cfdf..8fb3319 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
@@ -40,7 +40,7 @@ public class SqlTimeComparatorTest extends 
ComparatorTestBase<Time> {
        protected Time[] getSortedTestData() {
                return new Time[] {
                        Time.valueOf("00:00:00"),
-                       Time.valueOf("02:42:85"),
+                       Time.valueOf("02:42:25"),
                        Time.valueOf("14:15:59"),
                        Time.valueOf("18:00:45")
                };

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
index 4d16050..bfac789 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
@@ -47,7 +47,7 @@ public class SqlTimeSerializerTest extends 
SerializerTestBase<Time> {
                return new Time[] {
                        new Time(0L),
                        Time.valueOf("00:00:00"),
-                       Time.valueOf("02:42:85"),
+                       Time.valueOf("02:42:25"),
                        Time.valueOf("14:15:59"),
                        Time.valueOf("18:00:45")
                };

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
index 0b8d294..e182d0a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
@@ -40,9 +40,9 @@ public class SqlTimestampComparatorTest extends 
ComparatorTestBase<Timestamp> {
        protected Timestamp[] getSortedTestData() {
                return new Timestamp[] {
                        Timestamp.valueOf("1970-01-01 00:00:00.000"),
-                       Timestamp.valueOf("1990-10-14 02:42:85.123"),
-                       Timestamp.valueOf("1990-10-14 02:42:85.123000001"),
-                       Timestamp.valueOf("1990-10-14 02:42:85.123000002"),
+                       Timestamp.valueOf("1990-10-14 02:42:25.123"),
+                       Timestamp.valueOf("1990-10-14 02:42:25.123000001"),
+                       Timestamp.valueOf("1990-10-14 02:42:25.123000002"),
                        Timestamp.valueOf("2013-08-12 14:15:59.478"),
                        Timestamp.valueOf("2013-08-12 14:15:59.479"),
                        Timestamp.valueOf("2040-05-12 18:00:45.999")

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
index 70172d5..e825eaa 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
@@ -47,9 +47,9 @@ public class SqlTimestampSerializerTest extends 
SerializerTestBase<Timestamp> {
                return new Timestamp[] {
                        new Timestamp(0L),
                        Timestamp.valueOf("1970-01-01 00:00:00.000"),
-                       Timestamp.valueOf("1990-10-14 02:42:85.123"),
-                       Timestamp.valueOf("1990-10-14 02:42:85.123000001"),
-                       Timestamp.valueOf("1990-10-14 02:42:85.123000002"),
+                       Timestamp.valueOf("1990-10-14 02:42:25.123"),
+                       Timestamp.valueOf("1990-10-14 02:42:25.123000001"),
+                       Timestamp.valueOf("1990-10-14 02:42:25.123000002"),
                        Timestamp.valueOf("2013-08-12 14:15:59.478"),
                        Timestamp.valueOf("2013-08-12 14:15:59.479"),
                        Timestamp.valueOf("2040-05-12 18:00:45.999")

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java
new file mode 100644
index 0000000..25015cd
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+
+import java.sql.Date;
+
+public class SqlDateParserTest extends ParserTestBase<Date> {
+
+       @Override
+       public String[] getValidTestValues() {
+               return new String[] {
+                       "1970-01-01", "1990-10-14", "2013-08-12", "2040-05-12", 
"2040-5-12", "1970-1-1",
+               };
+       }
+
+       @Override
+       public Date[] getValidTestResults() {
+               return new Date[] {
+                       Date.valueOf("1970-01-01"), Date.valueOf("1990-10-14"), 
Date.valueOf("2013-08-12"),
+                       Date.valueOf("2040-05-12"), Date.valueOf("2040-05-12"), 
Date.valueOf("1970-01-01")
+               };
+       }
+
+       @Override
+       public String[] getInvalidTestValues() {
+               return new String[] {
+                       " 2013-08-12", "2013-08-12 ", "2013-08--12", 
"13-08-12", "2013/08/12", " ", "\t",
+                       "2013-XX-XX", "2000-02-35"
+               };
+       }
+
+       @Override
+       public boolean allowsEmptyField() {
+               return false;
+       }
+
+       @Override
+       public FieldParser<Date> getParser() {
+               return new SqlDateParser();
+       }
+
+       @Override
+       public Class<Date> getTypeClass() {
+               return Date.class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java
new file mode 100644
index 0000000..06ebd3d
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+
+import java.sql.Time;
+
+public class SqlTimeParserTest extends ParserTestBase<Time> {
+
+       @Override
+       public String[] getValidTestValues() {
+               return new String[] {
+                       "00:00:00", "02:42:25", "14:15:51", "18:00:45", 
"23:59:58", "0:0:0"
+               };
+       }
+
+       @Override
+       public Time[] getValidTestResults() {
+               return new Time[] {
+                       Time.valueOf("00:00:00"), Time.valueOf("02:42:25"), 
Time.valueOf("14:15:51"),
+                       Time.valueOf("18:00:45"), Time.valueOf("23:59:58"), 
Time.valueOf("0:0:0")
+               };
+       }
+
+       @Override
+       public String[] getInvalidTestValues() {
+               return new String[] {
+                       " 00:00:00", "00:00:00 ", "00:00::00", "00x00:00", 
"2013/08/12", " ", "\t"
+               };
+       }
+
+       @Override
+       public boolean allowsEmptyField() {
+               return false;
+       }
+
+       @Override
+       public FieldParser<Time> getParser() {
+               return new SqlTimeParser();
+       }
+
+       @Override
+       public Class<Time> getTypeClass() {
+               return Time.class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java
new file mode 100644
index 0000000..0527606
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+
+import java.sql.Timestamp;
+
+public class SqlTimestampParserTest extends ParserTestBase<Timestamp> {
+
+       @Override
+       public String[] getValidTestValues() {
+               return new String[] {
+                       "1970-01-01 00:00:00.000", "1990-10-14 02:42:25", 
"1990-10-14 02:42:25.123", "1990-10-14 02:42:25.123000001",
+                       "1990-10-14 02:42:25.123000002", "2013-08-12 
14:15:59.478", "2013-08-12 14:15:59.47",
+                       "0000-01-01 00:00:00.000",
+               };
+       }
+
+       @Override
+       public Timestamp[] getValidTestResults() {
+               return new Timestamp[] {
+                       Timestamp.valueOf("1970-01-01 00:00:00.000"), 
Timestamp.valueOf("1990-10-14 02:42:25"), Timestamp.valueOf("1990-10-14 
02:42:25.123"),
+                       Timestamp.valueOf("1990-10-14 02:42:25.123000001"), 
Timestamp.valueOf("1990-10-14 02:42:25.123000002"),
+                       Timestamp.valueOf("2013-08-12 14:15:59.478"), 
Timestamp.valueOf("2013-08-12 14:15:59.47"),
+                       Timestamp.valueOf("0000-01-01 00:00:00.000")
+               };
+       }
+
+       @Override
+       public String[] getInvalidTestValues() {
+               return new String[] {
+                       " 2013-08-12 14:15:59.479", "2013-08-12 14:15:59.479 ", 
"1970-01-01 00:00::00",
+                       "00x00:00", "2013/08/12", "0000-01-01 00:00:00.f00", 
"2013-08-12 14:15:59.4788888888888888",
+                       " ", "\t"
+               };
+       }
+
+       @Override
+       public boolean allowsEmptyField() {
+               return false;
+       }
+
+       @Override
+       public FieldParser<Timestamp> getParser() {
+               return new SqlTimestampParser();
+       }
+
+       @Override
+       public Class<Timestamp> getTypeClass() {
+               return Timestamp.class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
index db01b69..d176b79 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
@@ -20,9 +20,10 @@ package org.apache.flink.api.table.runtime.io
 
 import java.io.{File, FileOutputStream, OutputStreamWriter}
 import java.nio.charset.StandardCharsets
+import java.sql.{Date, Time, Timestamp}
 
 import org.apache.flink.api.common.io.ParseException
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
 import org.apache.flink.api.table.Row
 import org.apache.flink.api.table.runtime.io.RowCsvInputFormatTest.{PATH, 
createTempFile, testRemovingTrailingCR}
 import org.apache.flink.api.table.typeutils.RowTypeInfo
@@ -786,6 +787,45 @@ class RowCsvInputFormatTest {
     assertEquals("\\\"Hello\\\" World", record.productElement(0))
     assertEquals("We are\\\" young", record.productElement(1))
   }
+
+  @Test
+  def testSqlTimeFields() {
+    val fileContent = "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 
2:2:5\n" +
+      "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5.3\n"
+
+    val split = createTempFile(fileContent)
+
+    val typeInfo = new RowTypeInfo(Seq(
+      SqlTimeTypeInfo.DATE,
+      SqlTimeTypeInfo.TIME,
+      SqlTimeTypeInfo.TIMESTAMP,
+      SqlTimeTypeInfo.TIMESTAMP))
+
+    val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo)
+    format.setFieldDelimiter("|")
+    format.configure(new Configuration)
+    format.open(split)
+
+    var result = new Row(4)
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals(Date.valueOf("1990-10-14"), result.productElement(0))
+    assertEquals(Time.valueOf("02:42:25"), result.productElement(1))
+    assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), 
result.productElement(2))
+    assertEquals(Timestamp.valueOf("1990-01-04 02:02:05"), 
result.productElement(3))
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals(Date.valueOf("1990-10-14"), result.productElement(0))
+    assertEquals(Time.valueOf("02:42:25"), result.productElement(1))
+    assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), 
result.productElement(2))
+    assertEquals(Timestamp.valueOf("1990-01-04 02:02:05.3"), 
result.productElement(3))
+
+    result = format.nextRecord(result)
+    assertNull(result)
+    assertTrue(format.reachedEnd)
+  }
 }
 
 object RowCsvInputFormatTest {

Reply via email to