This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 479fb070edc4a681d85e4c20a964083751aa3720
Author: Timo Walther <twal...@apache.org>
AuthorDate: Wed Jul 10 08:52:30 2019 +0200

    [FLINK-13078][table-common] Add a logical type parser
    
    This adds a parser for all logical types defined in FLIP-37.
    
    This closes #9061.
---
 .../flink/table/types/logical/LogicalType.java     |   3 +
 .../types/logical/utils/LogicalTypeParser.java     | 900 +++++++++++++++++++++
 .../apache/flink/table/utils/EncodingUtils.java    |   6 +-
 .../apache/flink/table/utils/TypeStringUtils.java  |  12 +-
 .../flink/table/types/LogicalTypeParserTest.java   | 519 ++++++++++++
 5 files changed, 1437 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
index 4e4942a..cc46533 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.types.logical;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
 import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
@@ -98,6 +99,8 @@ public abstract class LogicalType implements Serializable {
         * Returns a string that fully serializes this instance. The serialized 
string can be used for
         * transmitting or persisting a type.
         *
+        * <p>See {@link LogicalTypeParser} for the reverse operation.
+        *
         * @return detailed string for transmission or persistence
         */
        public abstract String asSerializableString();
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
new file mode 100644
index 0000000..b6fcd07
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
@@ -0,0 +1,900 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import 
org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import 
org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Parser for creating instances of {@link LogicalType} from a serialized 
string created with
+ * {@link LogicalType#asSerializableString()}.
+ *
+ * <p>In addition to the serializable string representations, this parser also 
supports common
+ * shortcuts for certain types. This includes:
+ * <ul>
+ *     <li>{@code STRING} as a synonym for {@code VARCHAR(INT_MAX)}</li>
+ *     <li>{@code BYTES} as a synonym for {@code VARBINARY(INT_MAX)}</li>
+ *     <li>{@code NUMERIC} and {@code DEC} as synonyms for {@code DECIMAL}</li>
+ *     <li>{@code INTEGER} as a synonym for {@code INT}</li>
+ *     <li>{@code DOUBLE PRECISION} as a synonym for {@code DOUBLE}</li>
+ *     <li>{@code TIME WITHOUT TIME ZONE} as a synonym for {@code TIME}</li>
+ *     <li>{@code TIMESTAMP WITHOUT TIME ZONE} as a synonym for {@code 
TIMESTAMP}</li>
+ *     <li>{@code type ARRAY} as a synonym for {@code ARRAY<type>}</li>
+ *     <li>{@code type MULTISET} as a synonym for {@code MULTISET<type>}</li>
+ *     <li>{@code ROW(...)} as a synonym for {@code ROW<...>}</li>
+ *     <li>{@code type NULL} as a synonym for {@code type}</li>
+ * </ul>
+ *
+ * <p>Furthermore, it returns {@link UnresolvedUserDefinedType} for unknown 
types (partially or fully
+ * qualified such as {@code [catalog].[database].[type]}).
+ */
+@PublicEvolving
+public final class LogicalTypeParser {
+
+       /**
+        * Parses a type string. All types will be fully resolved except for 
{@link UnresolvedUserDefinedType}s.
+        *
+        * @param typeString a string like "ROW(field1 INT, field2 BOOLEAN)"
+        * @param classLoader class loader for loading classes of the ANY type
+        * @throws ValidationException in case of parsing errors.
+        */
+       public static LogicalType parse(String typeString, ClassLoader 
classLoader) {
+               final List<Token> tokens = tokenize(typeString);
+               final TokenParser converter = new TokenParser(typeString, 
tokens, classLoader);
+               return converter.parseTokens();
+       }
+
+       /**
+        * Parses a type string. All types will be fully resolved except for 
{@link UnresolvedUserDefinedType}s.
+        *
+        * @param typeString a string like "ROW(field1 INT, field2 BOOLEAN)"
+        * @throws ValidationException in case of parsing errors.
+        */
+       public static LogicalType parse(String typeString) {
+               return parse(typeString, 
Thread.currentThread().getContextClassLoader());
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Tokenizer
+       // 
--------------------------------------------------------------------------------------------
+
+       private static final char CHAR_BEGIN_SUBTYPE = '<';
+       private static final char CHAR_END_SUBTYPE = '>';
+       private static final char CHAR_BEGIN_PARAMETER = '(';
+       private static final char CHAR_END_PARAMETER = ')';
+       private static final char CHAR_LIST_SEPARATOR = ',';
+       private static final char CHAR_STRING = '\'';
+       private static final char CHAR_IDENTIFIER = '`';
+       private static final char CHAR_DOT = '.';
+
+       private static boolean isDelimiter(char character) {
+               return Character.isWhitespace(character) ||
+                       character == CHAR_BEGIN_SUBTYPE ||
+                       character == CHAR_END_SUBTYPE ||
+                       character == CHAR_BEGIN_PARAMETER ||
+                       character == CHAR_END_PARAMETER ||
+                       character == CHAR_LIST_SEPARATOR ||
+                       character == CHAR_DOT;
+       }
+
+       private static boolean isDigit(char c) {
+               return c >= '0' && c <= '9';
+       }
+
+       private static List<Token> tokenize(String chars) {
+               final List<Token> tokens = new ArrayList<>();
+               final StringBuilder builder = new StringBuilder();
+               for (int cursor = 0; cursor < chars.length(); cursor++) {
+                       char curChar = chars.charAt(cursor);
+                       switch (curChar) {
+                               case CHAR_BEGIN_SUBTYPE:
+                                       tokens.add(new 
Token(TokenType.BEGIN_SUBTYPE, cursor, Character.toString(CHAR_BEGIN_SUBTYPE)));
+                                       break;
+                               case CHAR_END_SUBTYPE:
+                                       tokens.add(new 
Token(TokenType.END_SUBTYPE, cursor, Character.toString(CHAR_END_SUBTYPE)));
+                                       break;
+                               case CHAR_BEGIN_PARAMETER:
+                                       tokens.add(new 
Token(TokenType.BEGIN_PARAMETER, cursor, 
Character.toString(CHAR_BEGIN_PARAMETER)));
+                                       break;
+                               case CHAR_END_PARAMETER:
+                                       tokens.add(new 
Token(TokenType.END_PARAMETER, cursor, Character.toString(CHAR_END_PARAMETER)));
+                                       break;
+                               case CHAR_LIST_SEPARATOR:
+                                       tokens.add(new 
Token(TokenType.LIST_SEPARATOR, cursor, 
Character.toString(CHAR_LIST_SEPARATOR)));
+                                       break;
+                               case CHAR_DOT:
+                                       tokens.add(new 
Token(TokenType.IDENTIFIER_SEPARATOR, cursor, Character.toString(CHAR_DOT)));
+                                       break;
+                               case CHAR_STRING:
+                                       builder.setLength(0);
+                                       cursor = consumeEscaped(builder, chars, 
cursor, CHAR_STRING);
+                                       tokens.add(new 
Token(TokenType.LITERAL_STRING, cursor, builder.toString()));
+                                       break;
+                               case CHAR_IDENTIFIER:
+                                       builder.setLength(0);
+                                       cursor = consumeEscaped(builder, chars, 
cursor, CHAR_IDENTIFIER);
+                                       tokens.add(new 
Token(TokenType.IDENTIFIER, cursor, builder.toString()));
+                                       break;
+                               default:
+                                       if (Character.isWhitespace(curChar)) {
+                                               continue;
+                                       }
+                                       if (isDigit(curChar)) {
+                                               builder.setLength(0);
+                                               cursor = consumeInt(builder, 
chars, cursor);
+                                               tokens.add(new 
Token(TokenType.LITERAL_INT, cursor, builder.toString()));
+                                               break;
+                                       }
+                                       builder.setLength(0);
+                                       cursor = consumeIdentifier(builder, 
chars, cursor);
+                                       final String token = builder.toString();
+                                       final String normalizedToken = 
token.toUpperCase();
+                                       if (KEYWORDS.contains(normalizedToken)) 
{
+                                               tokens.add(new 
Token(TokenType.KEYWORD, cursor, normalizedToken));
+                                       } else {
+                                               tokens.add(new 
Token(TokenType.IDENTIFIER, cursor, token));
+                                       }
+                       }
+               }
+
+               return tokens;
+       }
+
+       private static int consumeEscaped(StringBuilder builder, String chars, 
int cursor, char delimiter) {
+               // skip delimiter
+               cursor++;
+               for (; chars.length() > cursor; cursor++) {
+                       final char curChar = chars.charAt(cursor);
+                       if (curChar == delimiter && cursor + 1 < chars.length() 
&& chars.charAt(cursor + 1) == delimiter) {
+                               // escaping of the escaping char e.g. "'Hello 
'' World'"
+                               cursor++;
+                               builder.append(curChar);
+                       } else if (curChar == delimiter) {
+                               break;
+                       } else {
+                               builder.append(curChar);
+                       }
+               }
+               return cursor;
+       }
+
+       private static int consumeInt(StringBuilder builder, String chars, int 
cursor) {
+               for (; chars.length() > cursor && 
isDigit(chars.charAt(cursor)); cursor++) {
+                       builder.append(chars.charAt(cursor));
+               }
+               return cursor - 1;
+       }
+
+       private static int consumeIdentifier(StringBuilder builder, String 
chars, int cursor) {
+               for (; cursor < chars.length() && 
!isDelimiter(chars.charAt(cursor)); cursor++) {
+                       builder.append(chars.charAt(cursor));
+               }
+               return cursor - 1;
+       }
+
+       private enum TokenType {
+               // e.g. "ROW<"
+               BEGIN_SUBTYPE,
+
+               // e.g. "ROW<..>"
+               END_SUBTYPE,
+
+               // e.g. "CHAR("
+               BEGIN_PARAMETER,
+
+               // e.g. "CHAR(...)"
+               END_PARAMETER,
+
+               // e.g. "ROW<INT,"
+               LIST_SEPARATOR,
+
+               // e.g. "ROW<name INT 'Comment'"
+               LITERAL_STRING,
+
+               // CHAR(12
+               LITERAL_INT,
+
+               // e.g. "CHAR" or "TO"
+               KEYWORD,
+
+               // e.g. "ROW<name" or "myCatalog.myDatabase"
+               IDENTIFIER,
+
+               // e.g. "myCatalog.myDatabase."
+               IDENTIFIER_SEPARATOR
+       }
+
+       private enum Keyword {
+               CHAR,
+               VARCHAR,
+               STRING,
+               BOOLEAN,
+               BINARY,
+               VARBINARY,
+               BYTES,
+               DECIMAL,
+               NUMERIC,
+               DEC,
+               TINYINT,
+               SMALLINT,
+               INT,
+               INTEGER,
+               BIGINT,
+               FLOAT,
+               DOUBLE,
+               PRECISION,
+               DATE,
+               TIME,
+               WITH,
+               WITHOUT,
+               LOCAL,
+               ZONE,
+               TIMESTAMP,
+               INTERVAL,
+               YEAR,
+               MONTH,
+               DAY,
+               HOUR,
+               MINUTE,
+               SECOND,
+               TO,
+               ARRAY,
+               MULTISET,
+               MAP,
+               ROW,
+               NULL,
+               ANY,
+               NOT
+       }
+
+       private static final Set<String> KEYWORDS = Stream.of(Keyword.values())
+               .map(k -> k.toString().toUpperCase())
+               .collect(Collectors.toSet());
+
+       private static class Token {
+               public final TokenType type;
+               public final int cursorPosition;
+               public final String value;
+
+               public Token(TokenType type, int cursorPosition, String value) {
+                       this.type = type;
+                       this.cursorPosition = cursorPosition;
+                       this.value = value;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Token Parsing
+       // 
--------------------------------------------------------------------------------------------
+
+       private static class TokenParser {
+
+               private final String inputString;
+
+               private final List<Token> tokens;
+
+               private final ClassLoader classLoader;
+
+               private int lastValidToken;
+
+               private int currentToken;
+
+               public TokenParser(String inputString, List<Token> tokens, 
ClassLoader classLoader) {
+                       this.inputString = inputString;
+                       this.tokens = tokens;
+                       this.classLoader = classLoader;
+                       this.lastValidToken = -1;
+                       this.currentToken = -1;
+               }
+
+               private LogicalType parseTokens() {
+                       final LogicalType type = parseTypeWithNullability();
+                       if (hasRemainingTokens()) {
+                               nextToken();
+                               throw parsingError("Unexpected token: " + 
token().value);
+                       }
+                       return type;
+               }
+
+               private int lastCursor() {
+                       if (lastValidToken < 0) {
+                               return 0;
+                       }
+                       return tokens.get(lastValidToken).cursorPosition + 1;
+               }
+
+               private ValidationException parsingError(String cause, 
@Nullable Throwable t) {
+                       return new ValidationException(
+                               String.format(
+                                       "Could not parse type at position %d: 
%s\n Input type string: %s",
+                                       lastCursor(),
+                                       cause,
+                                       inputString),
+                               t);
+               }
+
+               private ValidationException parsingError(String cause) {
+                       return parsingError(cause, null);
+               }
+
+               private boolean hasRemainingTokens() {
+                       return currentToken + 1 < tokens.size();
+               }
+
+               private Token token() {
+                       return tokens.get(currentToken);
+               }
+
+               private int tokenAsInt() {
+                       try {
+                               return Integer.valueOf(token().value);
+                       } catch (NumberFormatException e) {
+                               throw parsingError("Invalid integer value.", e);
+                       }
+               }
+
+               private Keyword tokenAsKeyword() {
+                       return Keyword.valueOf(token().value);
+               }
+
+               private String tokenAsString() {
+                       return token().value;
+               }
+
+               private void nextToken() {
+                       this.currentToken++;
+                       if (currentToken >= tokens.size()) {
+                               throw parsingError("Unexpected end.");
+                       }
+                       lastValidToken = this.currentToken - 1;
+               }
+
+               private void nextToken(TokenType type) {
+                       nextToken();
+                       final Token token = token();
+                       if (token.type != type) {
+                               throw parsingError("<" + type.name() + "> 
expected but was <" + token.type + ">.");
+                       }
+               }
+
+               private void nextToken(Keyword keyword) {
+                       nextToken(TokenType.KEYWORD);
+                       final Token token = token();
+                       if (!keyword.equals(Keyword.valueOf(token.value))) {
+                               throw parsingError("Keyword '" + keyword + "' 
expected but was '" + token.value + "'.");
+                       }
+               }
+
+               private boolean hasNextToken(TokenType... types) {
+                       if (currentToken + types.length + 1 > tokens.size()) {
+                               return false;
+                       }
+                       for (int i = 0; i < types.length; i++) {
+                               final Token lookAhead = tokens.get(currentToken 
+ i + 1);
+                               if (lookAhead.type != types[i]) {
+                                       return false;
+                               }
+                       }
+                       return true;
+               }
+
+               private boolean hasNextToken(Keyword... keywords) {
+                       if (currentToken + keywords.length + 1 > tokens.size()) 
{
+                               return false;
+                       }
+                       for (int i = 0; i < keywords.length; i++) {
+                               final Token lookAhead = tokens.get(currentToken 
+ i + 1);
+                               if (lookAhead.type != TokenType.KEYWORD ||
+                                               keywords[i] != 
Keyword.valueOf(lookAhead.value)) {
+                                       return false;
+                               }
+                       }
+                       return true;
+               }
+
+               private boolean parseNullability() {
+                       // "NOT NULL"
+                       if (hasNextToken(Keyword.NOT, Keyword.NULL)) {
+                               nextToken(Keyword.NOT);
+                               nextToken(Keyword.NULL);
+                               return false;
+                       }
+                       // explicit "NULL"
+                       else if (hasNextToken(Keyword.NULL)) {
+                               nextToken(Keyword.NULL);
+                               return true;
+                       }
+                       // implicit "NULL"
+                       return true;
+               }
+
+               private LogicalType parseTypeWithNullability() {
+                       final LogicalType logicalType;
+                       if (hasNextToken(TokenType.IDENTIFIER)) {
+                               logicalType = 
parseTypeByIdentifier().copy(parseNullability());
+                       } else {
+                               logicalType = 
parseTypeByKeyword().copy(parseNullability());
+                       }
+
+                       // special case: suffix notation for ARRAY and MULTISET 
types
+                       if (hasNextToken(Keyword.ARRAY)) {
+                               nextToken(Keyword.ARRAY);
+                               return new 
ArrayType(logicalType).copy(parseNullability());
+                       } else if (hasNextToken(Keyword.MULTISET)) {
+                               nextToken(Keyword.MULTISET);
+                               return new 
MultisetType(logicalType).copy(parseNullability());
+                       }
+
+                       return logicalType;
+               }
+
+               private LogicalType parseTypeByKeyword() {
+                       nextToken(TokenType.KEYWORD);
+                       switch (tokenAsKeyword()) {
+                               case CHAR:
+                                       return parseCharType();
+                               case VARCHAR:
+                                       return parseVarCharType();
+                               case STRING:
+                                       return new 
VarCharType(VarCharType.MAX_LENGTH);
+                               case BOOLEAN:
+                                       return new BooleanType();
+                               case BINARY:
+                                       return parseBinaryType();
+                               case VARBINARY:
+                                       return parseVarBinaryType();
+                               case BYTES:
+                                       return new 
VarBinaryType(VarBinaryType.MAX_LENGTH);
+                               case DECIMAL:
+                               case NUMERIC:
+                               case DEC:
+                                       return parseDecimalType();
+                               case TINYINT:
+                                       return new TinyIntType();
+                               case SMALLINT:
+                                       return new SmallIntType();
+                               case INT:
+                               case INTEGER:
+                                       return new IntType();
+                               case BIGINT:
+                                       return new BigIntType();
+                               case FLOAT:
+                                       return new FloatType();
+                               case DOUBLE:
+                                       return parseDoubleType();
+                               case DATE:
+                                       return new DateType();
+                               case TIME:
+                                       return parseTimeType();
+                               case TIMESTAMP:
+                                       return parseTimestampType();
+                               case INTERVAL:
+                                       return parseIntervalType();
+                               case ARRAY:
+                                       return parseArrayType();
+                               case MULTISET:
+                                       return parseMultisetType();
+                               case MAP:
+                                       return parseMapType();
+                               case ROW:
+                                       return parseRowType();
+                               case NULL:
+                                       return new NullType();
+                               case ANY:
+                                       return parseAnyType();
+                               default:
+                                       throw parsingError("Unsupported type: " 
+ token().value);
+                       }
+               }
+
+               private LogicalType parseTypeByIdentifier() {
+                       nextToken(TokenType.IDENTIFIER);
+                       List<String> parts = new ArrayList<>();
+                       parts.add(tokenAsString());
+                       if (hasNextToken(TokenType.IDENTIFIER_SEPARATOR)) {
+                               nextToken(TokenType.IDENTIFIER_SEPARATOR);
+                               nextToken(TokenType.IDENTIFIER);
+                               parts.add(tokenAsString());
+                       }
+                       if (hasNextToken(TokenType.IDENTIFIER_SEPARATOR)) {
+                               nextToken(TokenType.IDENTIFIER_SEPARATOR);
+                               nextToken(TokenType.IDENTIFIER);
+                               parts.add(tokenAsString());
+                       }
+                       return new UnresolvedUserDefinedType(
+                               lastPart(parts, 2),
+                               lastPart(parts, 1),
+                               lastPart(parts, 0));
+               }
+
+               private @Nullable String lastPart(List<String> parts, int 
inversePos) {
+                       final int pos = parts.size() - inversePos - 1;
+                       if (pos < 0) {
+                               return null;
+                       }
+                       return parts.get(pos);
+               }
+
+               private int parseStringType() {
+                       // explicit length
+                       if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+                               nextToken(TokenType.BEGIN_PARAMETER);
+                               nextToken(TokenType.LITERAL_INT);
+                               final int length = tokenAsInt();
+                               nextToken(TokenType.END_PARAMETER);
+                               return length;
+                       }
+                       // implicit length
+                       return -1;
+               }
+
+               private LogicalType parseCharType() {
+                       final int length = parseStringType();
+                       if (length < 0) {
+                               return new CharType();
+                       } else {
+                               return new CharType(length);
+                       }
+               }
+
+               private LogicalType parseVarCharType() {
+                       final int length = parseStringType();
+                       if (length < 0) {
+                               return new VarCharType();
+                       } else {
+                               return new VarCharType(length);
+                       }
+               }
+
+               private LogicalType parseBinaryType() {
+                       final int length = parseStringType();
+                       if (length < 0) {
+                               return new BinaryType();
+                       } else {
+                               return new BinaryType(length);
+                       }
+               }
+
+               private LogicalType parseVarBinaryType() {
+                       final int length = parseStringType();
+                       if (length < 0) {
+                               return new VarBinaryType();
+                       } else {
+                               return new VarBinaryType(length);
+                       }
+               }
+
+               private LogicalType parseDecimalType() {
+                       int precision = DecimalType.DEFAULT_PRECISION;
+                       int scale = DecimalType.DEFAULT_SCALE;
+                       if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+                               nextToken(TokenType.BEGIN_PARAMETER);
+                               nextToken(TokenType.LITERAL_INT);
+                               precision = tokenAsInt();
+                               if (hasNextToken(TokenType.LIST_SEPARATOR)) {
+                                       nextToken(TokenType.LIST_SEPARATOR);
+                                       nextToken(TokenType.LITERAL_INT);
+                                       scale = tokenAsInt();
+                               }
+                               nextToken(TokenType.END_PARAMETER);
+                       }
+                       return new DecimalType(precision, scale);
+               }
+
+               private LogicalType parseDoubleType() {
+                       if (hasNextToken(Keyword.PRECISION)) {
+                               nextToken(Keyword.PRECISION);
+                       }
+                       return new DoubleType();
+               }
+
+               private LogicalType parseTimeType() {
+                       int precision = 
parseOptionalPrecision(TimeType.DEFAULT_PRECISION);
+                       if (hasNextToken(Keyword.WITHOUT)) {
+                               nextToken(Keyword.WITHOUT);
+                               nextToken(Keyword.TIME);
+                               nextToken(Keyword.ZONE);
+                       }
+                       return new TimeType(precision);
+               }
+
+               private LogicalType parseTimestampType() {
+                       int precision = 
parseOptionalPrecision(TimestampType.DEFAULT_PRECISION);
+                       if (hasNextToken(Keyword.WITHOUT)) {
+                               nextToken(Keyword.WITHOUT);
+                               nextToken(Keyword.TIME);
+                               nextToken(Keyword.ZONE);
+                       } else if (hasNextToken(Keyword.WITH)) {
+                               nextToken(Keyword.WITH);
+                               if (hasNextToken(Keyword.LOCAL)) {
+                                       nextToken(Keyword.LOCAL);
+                                       nextToken(Keyword.TIME);
+                                       nextToken(Keyword.ZONE);
+                                       return new 
LocalZonedTimestampType(precision);
+                               } else {
+                                       nextToken(Keyword.TIME);
+                                       nextToken(Keyword.ZONE);
+                                       return new 
ZonedTimestampType(precision);
+                               }
+                       }
+                       return new TimestampType(precision);
+               }
+
+               private LogicalType parseIntervalType() {
+                       nextToken(TokenType.KEYWORD);
+                       switch (tokenAsKeyword()) {
+                               case YEAR:
+                               case MONTH:
+                                       return parseYearMonthIntervalType();
+                               case DAY:
+                               case HOUR:
+                               case MINUTE:
+                               case SECOND:
+                                       return parseDayTimeIntervalType();
+                               default:
+                                       throw parsingError("Invalid interval 
resolution.");
+                       }
+               }
+
+               private LogicalType parseYearMonthIntervalType() {
+                       int yearPrecision = 
YearMonthIntervalType.DEFAULT_PRECISION;
+                       switch (tokenAsKeyword()) {
+                               case YEAR:
+                                       yearPrecision = 
parseOptionalPrecision(yearPrecision);
+                                       if (hasNextToken(Keyword.TO)) {
+                                               nextToken(Keyword.TO);
+                                               nextToken(Keyword.MONTH);
+                                               return new 
YearMonthIntervalType(
+                                                       
YearMonthResolution.YEAR_TO_MONTH,
+                                                       yearPrecision);
+                                       }
+                                       return new YearMonthIntervalType(
+                                               YearMonthResolution.YEAR,
+                                               yearPrecision);
+                               case MONTH:
+                                       return new YearMonthIntervalType(
+                                               YearMonthResolution.MONTH,
+                                               yearPrecision);
+                               default:
+                                       throw parsingError("Invalid year-month 
interval resolution.");
+                       }
+               }
+
+               private LogicalType parseDayTimeIntervalType() {
+                       int dayPrecision = 
DayTimeIntervalType.DEFAULT_DAY_PRECISION;
+                       int fractionalPrecision = 
DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION;
+                       switch (tokenAsKeyword()) {
+                               case DAY:
+                                       dayPrecision = 
parseOptionalPrecision(dayPrecision);
+                                       if (hasNextToken(Keyword.TO)) {
+                                               nextToken(Keyword.TO);
+                                               nextToken(TokenType.KEYWORD);
+                                               switch (tokenAsKeyword()) {
+                                                       case HOUR:
+                                                               return new 
DayTimeIntervalType(
+                                                                       
DayTimeResolution.DAY_TO_HOUR,
+                                                                       
dayPrecision,
+                                                                       
fractionalPrecision);
+                                                       case MINUTE:
+                                                               return new 
DayTimeIntervalType(
+                                                                       
DayTimeResolution.DAY_TO_MINUTE,
+                                                                       
dayPrecision,
+                                                                       
fractionalPrecision);
+                                                       case SECOND:
+                                                               
fractionalPrecision = parseOptionalPrecision(fractionalPrecision);
+                                                               return new 
DayTimeIntervalType(
+                                                                       
DayTimeResolution.DAY_TO_SECOND,
+                                                                       
dayPrecision,
+                                                                       
fractionalPrecision);
+                                                       default:
+                                                               throw 
parsingError("Invalid day-time interval resolution.");
+                                               }
+                                       }
+                                       return new DayTimeIntervalType(
+                                               DayTimeResolution.DAY,
+                                               dayPrecision,
+                                               fractionalPrecision);
+                               case HOUR:
+                                       if (hasNextToken(Keyword.TO)) {
+                                               nextToken(Keyword.TO);
+                                               nextToken(TokenType.KEYWORD);
+                                               switch (tokenAsKeyword()) {
+                                                       case MINUTE:
+                                                               return new 
DayTimeIntervalType(
+                                                                       
DayTimeResolution.HOUR_TO_MINUTE,
+                                                                       
dayPrecision,
+                                                                       
fractionalPrecision);
+                                                       case SECOND:
+                                                               
fractionalPrecision = parseOptionalPrecision(fractionalPrecision);
+                                                               return new 
DayTimeIntervalType(
+                                                                       
DayTimeResolution.HOUR_TO_SECOND,
+                                                                       
dayPrecision,
+                                                                       
fractionalPrecision);
+                                                       default:
+                                                               throw 
parsingError("Invalid day-time interval resolution.");
+                                               }
+                                       }
+                                       return new DayTimeIntervalType(
+                                               DayTimeResolution.HOUR,
+                                               dayPrecision,
+                                               fractionalPrecision);
+                               case MINUTE:
+                                       if (hasNextToken(Keyword.TO)) {
+                                               nextToken(Keyword.TO);
+                                               nextToken(Keyword.SECOND);
+                                               fractionalPrecision = 
parseOptionalPrecision(fractionalPrecision);
+                                               return new DayTimeIntervalType(
+                                                       
DayTimeResolution.MINUTE_TO_SECOND,
+                                                       dayPrecision,
+                                                       fractionalPrecision);
+                                       }
+                                       return new DayTimeIntervalType(
+                                               DayTimeResolution.MINUTE,
+                                               dayPrecision,
+                                               fractionalPrecision);
+                               case SECOND:
+                                       fractionalPrecision = 
parseOptionalPrecision(fractionalPrecision);
+                                       return new DayTimeIntervalType(
+                                               DayTimeResolution.SECOND,
+                                               dayPrecision,
+                                               fractionalPrecision);
+                               default:
+                                       throw parsingError("Invalid day-time 
interval resolution.");
+                       }
+               }
+
+               private int parseOptionalPrecision(int defaultPrecision) {
+                       int precision = defaultPrecision;
+                       if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+                               nextToken(TokenType.BEGIN_PARAMETER);
+                               nextToken(TokenType.LITERAL_INT);
+                               precision = tokenAsInt();
+                               nextToken(TokenType.END_PARAMETER);
+                       }
+                       return precision;
+               }
+
+               private LogicalType parseArrayType() {
+                       nextToken(TokenType.BEGIN_SUBTYPE);
+                       final LogicalType elementType = 
parseTypeWithNullability();
+                       nextToken(TokenType.END_SUBTYPE);
+                       return new ArrayType(elementType);
+               }
+
+               private LogicalType parseMultisetType() {
+                       nextToken(TokenType.BEGIN_SUBTYPE);
+                       final LogicalType elementType = 
parseTypeWithNullability();
+                       nextToken(TokenType.END_SUBTYPE);
+                       return new MultisetType(elementType);
+               }
+
+               private LogicalType parseMapType() {
+                       nextToken(TokenType.BEGIN_SUBTYPE);
+                       final LogicalType keyType = parseTypeWithNullability();
+                       nextToken(TokenType.LIST_SEPARATOR);
+                       final LogicalType valueType = 
parseTypeWithNullability();
+                       nextToken(TokenType.END_SUBTYPE);
+                       return new MapType(keyType, valueType);
+               }
+
+               private LogicalType parseRowType() {
+                       List<RowType.RowField> fields;
+                       // SQL standard notation
+                       if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+                               nextToken(TokenType.BEGIN_PARAMETER);
+                               fields = 
parseRowFields(TokenType.END_PARAMETER);
+                               nextToken(TokenType.END_PARAMETER);
+                       } else {
+                               nextToken(TokenType.BEGIN_SUBTYPE);
+                               fields = parseRowFields(TokenType.END_SUBTYPE);
+                               nextToken(TokenType.END_SUBTYPE);
+                       }
+                       return new RowType(fields);
+               }
+
+               private List<RowType.RowField> parseRowFields(TokenType 
endToken) {
+                       List<RowType.RowField> fields = new ArrayList<>();
+                       boolean isFirst = true;
+                       while (!hasNextToken(endToken)) {
+                               if (isFirst) {
+                                       isFirst = false;
+                               } else {
+                                       nextToken(TokenType.LIST_SEPARATOR);
+                               }
+                               nextToken(TokenType.IDENTIFIER);
+                               final String name = tokenAsString();
+                               final LogicalType type = 
parseTypeWithNullability();
+                               if (hasNextToken(TokenType.LITERAL_STRING)) {
+                                       nextToken(TokenType.LITERAL_STRING);
+                                       final String description = 
tokenAsString();
+                                       fields.add(new RowType.RowField(name, 
type, description));
+                               } else {
+                                       fields.add(new RowType.RowField(name, 
type));
+                               }
+                       }
+                       return fields;
+               }
+
+               @SuppressWarnings("unchecked")
+               private LogicalType parseAnyType() {
+                       nextToken(TokenType.BEGIN_PARAMETER);
+                       nextToken(TokenType.LITERAL_STRING);
+                       final String className = tokenAsString();
+
+                       nextToken(TokenType.LIST_SEPARATOR);
+                       nextToken(TokenType.LITERAL_STRING);
+                       final String serializer = tokenAsString();
+                       nextToken(TokenType.END_PARAMETER);
+
+                       try {
+                               final Class<?> clazz = Class.forName(className, 
true, classLoader);
+                               final byte[] bytes = 
EncodingUtils.decodeBase64ToBytes(serializer);
+                               final DataInputDeserializer inputDeserializer = 
new DataInputDeserializer(bytes);
+                               final TypeSerializerSnapshot<?> snapshot = 
TypeSerializerSnapshot.readVersionedSnapshot(
+                                       inputDeserializer,
+                                       classLoader);
+                               return new AnyType(clazz, 
snapshot.restoreSerializer());
+                       } catch (Throwable t) {
+                               throw parsingError(
+                                       "Unable to restore the ANY type of 
class '" + className + "' with " +
+                                               "serializer snapshot '" + 
serializer + "'.", t);
+                       }
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
index 95d10e3..ca88427 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
@@ -110,12 +110,16 @@ public abstract class EncodingUtils {
                return new String(java.util.Base64.getEncoder().encode(bytes), 
UTF_8);
        }
 
+       public static byte[] decodeBase64ToBytes(String base64) {
+               return 
java.util.Base64.getDecoder().decode(base64.getBytes(UTF_8));
+       }
+
        public static String encodeStringToBase64(String string) {
                return encodeBytesToBase64(string.getBytes(UTF_8));
        }
 
        public static String decodeBase64ToString(String base64) {
-               return new 
String(java.util.Base64.getDecoder().decode(base64.getBytes(UTF_8)), UTF_8);
+               return new String(decodeBase64ToBytes(base64), UTF_8);
        }
 
        public static byte[] md5(String string) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
index faf26d0..6bf414a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
@@ -32,13 +32,21 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
 
 import java.util.ArrayList;
 import java.util.List;
 
 /**
-  * Utilities to convert {@link TypeInformation} into a string representation 
and back.
-  */
+ * Utilities to convert {@link TypeInformation} into a string representation 
and back.
+ *
+ * @deprecated This utility is based on {@link TypeInformation}. However, the 
Table & SQL API is
+ *             currently updated to use {@link DataType}s based on {@link 
LogicalType}s. Use
+ *             {@link LogicalTypeParser} instead.
+ */
+@Deprecated
 @PublicEvolving
 public class TypeStringUtils {
 
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
new file mode 100644
index 0000000..df55424
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import 
org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import 
org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.UNRESOLVED;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link LogicalTypeParser}.
+ */
+@RunWith(Parameterized.class)
+public class LogicalTypeParserTest {
+
+       @Parameters(name = "{index}: [From: {0}, To: {1}]")
+       public static List<TestSpec> testData() {
+               return Arrays.asList(
+
+                       TestSpec
+                               .forString("CHAR")
+                               .expectType(new CharType()),
+
+                       TestSpec
+                               .forString("CHAR NOT NULL")
+                               .expectType(new  CharType().copy(false)),
+
+                       TestSpec
+                               .forString("CHAR   NOT \t\nNULL")
+                               .expectType(new  CharType().copy(false)),
+
+                       TestSpec
+                               .forString("char not null")
+                               .expectType(new CharType().copy(false)),
+
+                       TestSpec
+                               .forString("CHAR NULL")
+                               .expectType(new CharType()),
+
+                       TestSpec
+                               .forString("CHAR(33)")
+                               .expectType(new CharType(33)),
+
+                       TestSpec
+                               .forString("VARCHAR")
+                               .expectType(new VarCharType()),
+
+                       TestSpec
+                               .forString("VARCHAR(33)")
+                               .expectType(new VarCharType(33)),
+
+                       TestSpec
+                               .forString("STRING")
+                               .expectType(new 
VarCharType(VarCharType.MAX_LENGTH)),
+
+                       TestSpec
+                               .forString("BOOLEAN")
+                               .expectType(new BooleanType()),
+
+                       TestSpec
+                               .forString("BINARY")
+                               .expectType(new BinaryType()),
+
+                       TestSpec
+                               .forString("BINARY(33)")
+                               .expectType(new BinaryType(33)),
+
+                       TestSpec
+                               .forString("VARBINARY")
+                               .expectType(new VarBinaryType()),
+
+                       TestSpec
+                               .forString("VARBINARY(33)")
+                               .expectType(new VarBinaryType(33)),
+
+                       TestSpec
+                               .forString("BYTES")
+                               .expectType(new 
VarBinaryType(VarBinaryType.MAX_LENGTH)),
+
+                       TestSpec
+                               .forString("DECIMAL")
+                               .expectType(new DecimalType()),
+
+                       TestSpec
+                               .forString("DEC")
+                               .expectType(new DecimalType()),
+
+                       TestSpec
+                               .forString("NUMERIC")
+                               .expectType(new DecimalType()),
+
+                       TestSpec
+                               .forString("DECIMAL(10)")
+                               .expectType(new DecimalType(10)),
+
+                       TestSpec
+                               .forString("DEC(10)")
+                               .expectType(new DecimalType(10)),
+
+                       TestSpec
+                               .forString("NUMERIC(10)")
+                               .expectType(new DecimalType(10)),
+
+                       TestSpec
+                               .forString("DECIMAL(10, 3)")
+                               .expectType(new DecimalType(10, 3)),
+
+                       TestSpec
+                               .forString("DEC(10, 3)")
+                               .expectType(new DecimalType(10, 3)),
+
+                       TestSpec
+                               .forString("NUMERIC(10, 3)")
+                               .expectType(new DecimalType(10, 3)),
+
+                       TestSpec
+                               .forString("TINYINT")
+                               .expectType(new TinyIntType()),
+
+                       TestSpec
+                               .forString("SMALLINT")
+                               .expectType(new SmallIntType()),
+
+                       TestSpec
+                               .forString("INTEGER")
+                               .expectType(new IntType()),
+
+                       TestSpec
+                               .forString("INT")
+                               .expectType(new IntType()),
+
+                       TestSpec
+                               .forString("BIGINT")
+                               .expectType(new BigIntType()),
+
+                       TestSpec
+                               .forString("FLOAT")
+                               .expectType(new FloatType()),
+
+                       TestSpec
+                               .forString("DOUBLE")
+                               .expectType(new DoubleType()),
+
+                       TestSpec
+                               .forString("DOUBLE PRECISION")
+                               .expectType(new DoubleType()),
+
+                       TestSpec
+                               .forString("DATE")
+                               .expectType(new DateType()),
+
+                       TestSpec
+                               .forString("TIME")
+                               .expectType(new TimeType()),
+
+                       TestSpec
+                               .forString("TIME(3)")
+                               .expectType(new TimeType(3)),
+
+                       TestSpec
+                               .forString("TIME WITHOUT TIME ZONE")
+                               .expectType(new TimeType()),
+
+                       TestSpec
+                               .forString("TIME(3) WITHOUT TIME ZONE")
+                               .expectType(new TimeType(3)),
+
+                       TestSpec
+                               .forString("TIMESTAMP")
+                               .expectType(new TimestampType()),
+
+                       TestSpec
+                               .forString("TIMESTAMP(3)")
+                               .expectType(new TimestampType(3)),
+
+                       TestSpec
+                               .forString("TIMESTAMP WITHOUT TIME ZONE")
+                               .expectType(new TimestampType()),
+
+                       TestSpec
+                               .forString("TIMESTAMP(3) WITHOUT TIME ZONE")
+                               .expectType(new TimestampType(3)),
+
+                       TestSpec
+                               .forString("TIMESTAMP WITH TIME ZONE")
+                               .expectType(new ZonedTimestampType()),
+
+                       TestSpec
+                               .forString("TIMESTAMP(3) WITH TIME ZONE")
+                               .expectType(new ZonedTimestampType(3)),
+
+                       TestSpec
+                               .forString("TIMESTAMP WITH LOCAL TIME ZONE")
+                               .expectType(new LocalZonedTimestampType()),
+
+                       TestSpec
+                               .forString("TIMESTAMP(3) WITH LOCAL TIME ZONE")
+                               .expectType(new LocalZonedTimestampType(3)),
+
+                       TestSpec
+                               .forString("INTERVAL YEAR")
+                               .expectType(new 
YearMonthIntervalType(YearMonthResolution.YEAR)),
+
+                       TestSpec
+                               .forString("INTERVAL YEAR(4)")
+                               .expectType(new 
YearMonthIntervalType(YearMonthResolution.YEAR, 4)),
+
+                       TestSpec
+                               .forString("INTERVAL MONTH")
+                               .expectType(new 
YearMonthIntervalType(YearMonthResolution.MONTH)),
+
+                       TestSpec
+                               .forString("INTERVAL YEAR TO MONTH")
+                               .expectType(new 
YearMonthIntervalType(YearMonthResolution.YEAR_TO_MONTH)),
+
+                       TestSpec
+                               .forString("INTERVAL YEAR(4) TO MONTH")
+                               .expectType(new 
YearMonthIntervalType(YearMonthResolution.YEAR_TO_MONTH, 4)),
+
+                       TestSpec
+                               .forString("INTERVAL DAY(2) TO SECOND(3)")
+                               .expectType(new 
DayTimeIntervalType(DayTimeResolution.DAY_TO_SECOND, 2, 3)),
+
+                       TestSpec
+                               .forString("INTERVAL HOUR TO SECOND(3)")
+                               .expectType(
+                                       new DayTimeIntervalType(
+                                               
DayTimeResolution.HOUR_TO_SECOND,
+                                               
DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+                                               3)
+                               ),
+
+                       TestSpec
+                               .forString("INTERVAL MINUTE")
+                               .expectType(new 
DayTimeIntervalType(DayTimeResolution.MINUTE)),
+
+                       TestSpec
+                               .forString("ARRAY<TIMESTAMP(3) WITH LOCAL TIME 
ZONE>")
+                               .expectType(new ArrayType(new 
LocalZonedTimestampType(3))),
+
+                       TestSpec
+                               .forString("ARRAY<INT NOT NULL>")
+                               .expectType(new ArrayType(new IntType(false))),
+
+                       TestSpec
+                               .forString("INT ARRAY")
+                               .expectType(new ArrayType(new IntType())),
+
+                       TestSpec
+                               .forString("INT NOT NULL ARRAY")
+                               .expectType(new ArrayType(new IntType(false))),
+
+                       TestSpec
+                               .forString("INT ARRAY NOT NULL")
+                               .expectType(new ArrayType(false, new 
IntType())),
+
+                       TestSpec
+                               .forString("MULTISET<INT NOT NULL>")
+                               .expectType(new MultisetType(new 
IntType(false))),
+
+                       TestSpec
+                               .forString("INT MULTISET")
+                               .expectType(new MultisetType(new IntType())),
+
+                       TestSpec
+                               .forString("INT NOT NULL MULTISET")
+                               .expectType(new MultisetType(new 
IntType(false))),
+
+                       TestSpec
+                               .forString("INT MULTISET NOT NULL")
+                               .expectType(new MultisetType(false, new 
IntType())),
+
+                       TestSpec
+                               .forString("MAP<BIGINT, BOOLEAN>")
+                               .expectType(new MapType(new BigIntType(), new 
BooleanType())),
+
+                       TestSpec
+                               .forString("ROW<f0 INT NOT NULL, f1 BOOLEAN>")
+                               .expectType(
+                                       new RowType(
+                                               Arrays.asList(
+                                                       new 
RowType.RowField("f0", new IntType(false)),
+                                                       new 
RowType.RowField("f1", new BooleanType())))
+                               ),
+
+                       TestSpec
+                               .forString("ROW(f0 INT NOT NULL, f1 BOOLEAN)")
+                               .expectType(
+                                       new RowType(
+                                               Arrays.asList(
+                                                       new 
RowType.RowField("f0", new IntType(false)),
+                                                       new 
RowType.RowField("f1", new BooleanType())))
+                               ),
+
+                       TestSpec
+                               .forString("ROW<`f0` INT>")
+                               .expectType(
+                                       new RowType(
+                                               Collections.singletonList(new 
RowType.RowField("f0", new IntType())))
+                               ),
+
+                       TestSpec
+                               .forString("ROW(`f0` INT)")
+                               .expectType(
+                                       new RowType(
+                                               Collections.singletonList(new 
RowType.RowField("f0", new IntType())))
+                               ),
+
+                       TestSpec
+                               .forString("ROW<>")
+                               .expectType(new 
RowType(Collections.emptyList())),
+
+                       TestSpec
+                               .forString("ROW()")
+                               .expectType(new 
RowType(Collections.emptyList())),
+
+                       TestSpec
+                               .forString("ROW<f0 INT NOT NULL 'This is a 
comment.', f1 BOOLEAN 'This as well.'>")
+                               .expectType(
+                                       new RowType(
+                                               Arrays.asList(
+                                                       new 
RowType.RowField("f0", new IntType(false), "This is a comment."),
+                                                       new 
RowType.RowField("f1", new BooleanType(), "This as well.")))
+                               ),
+
+                       TestSpec
+                               .forString("NULL")
+                               .expectType(new NullType()),
+
+                       TestSpec
+                               
.forString(createAnyType(LogicalTypeParserTest.class).asSerializableString())
+                               
.expectType(createAnyType(LogicalTypeParserTest.class)),
+
+                       TestSpec
+                               .forString("cat.db.MyType")
+                               .expectType(new 
UnresolvedUserDefinedType("cat", "db", "MyType")),
+
+                       TestSpec
+                               .forString("`db`.`MyType`")
+                               .expectType(new UnresolvedUserDefinedType(null, 
"db", "MyType")),
+
+                       TestSpec
+                               .forString("MyType")
+                               .expectType(new UnresolvedUserDefinedType(null, 
null, "MyType")),
+
+                       TestSpec
+                               .forString("ARRAY<MyType>")
+                               .expectType(new ArrayType(new 
UnresolvedUserDefinedType(null, null, "MyType"))),
+
+                       TestSpec
+                               .forString("ROW<f0 MyType, f1 `c`.`d`.`t`>")
+                               .expectType(
+                                       RowType.of(
+                                               new 
UnresolvedUserDefinedType(null, null, "MyType"),
+                                               new 
UnresolvedUserDefinedType("c", "d", "t"))
+                               ),
+
+                       // error message testing
+
+                       TestSpec
+                               .forString("ROW<`f0")
+                               .expectErrorMessage("Unexpected end"),
+
+                       TestSpec
+                               .forString("ROW<`f0`")
+                               .expectErrorMessage("Unexpected end"),
+
+                       TestSpec
+                               .forString("VARCHAR(test)")
+                               .expectErrorMessage("<LITERAL_INT> expected"),
+
+                       TestSpec
+                               .forString("VARCHAR(33333333333)")
+                               .expectErrorMessage("Invalid integer value"),
+
+                       TestSpec
+                               .forString("ROW<field INT, field2>")
+                               .expectErrorMessage("<KEYWORD> expected"),
+
+                       TestSpec
+                               .forString("ANY('unknown.class', '')")
+                               .expectErrorMessage("Unable to restore the ANY 
type")
+               );
+       }
+
+       @Parameter
+       public TestSpec testSpec;
+
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       @Test
+       public void testParsing() {
+               if (testSpec.expectedType != null) {
+                       assertThat(
+                               LogicalTypeParser.parse(testSpec.typeString),
+                               equalTo(testSpec.expectedType));
+               }
+       }
+
+       @Test
+       public void testSerializableParsing() {
+               if (testSpec.expectedType != null) {
+                       if (!hasRoot(testSpec.expectedType, UNRESOLVED) &&
+                                       
testSpec.expectedType.getChildren().stream().noneMatch(t -> hasRoot(t, 
UNRESOLVED))) {
+                               assertThat(
+                                       
LogicalTypeParser.parse(testSpec.expectedType.asSerializableString()),
+                                       equalTo(testSpec.expectedType));
+                       }
+               }
+       }
+
+       @Test
+       public void testErrorMessage() {
+               if (testSpec.expectedErrorMessage != null) {
+                       thrown.expect(ValidationException.class);
+                       thrown.expectMessage(testSpec.expectedErrorMessage);
+
+                       LogicalTypeParser.parse(testSpec.typeString);
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private static class TestSpec {
+
+               private final String typeString;
+
+               private @Nullable LogicalType expectedType;
+
+               private @Nullable String expectedErrorMessage;
+
+               private TestSpec(String typeString) {
+                       this.typeString = typeString;
+               }
+
+               static TestSpec forString(String typeString) {
+                       return new TestSpec(typeString);
+               }
+
+               TestSpec expectType(LogicalType expectedType) {
+                       this.expectedType = expectedType;
+                       return this;
+               }
+
+               TestSpec expectErrorMessage(String expectedErrorMessage) {
+                       this.expectedErrorMessage = expectedErrorMessage;
+                       return this;
+               }
+       }
+
+       private static <T> AnyType<T> createAnyType(Class<T> clazz) {
+               return new AnyType<>(clazz, new KryoSerializer<>(clazz, new 
ExecutionConfig()));
+       }
+}

Reply via email to