This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d894d35  [FLINK-12799][table] Improve expression based TableSchema 
extraction from DataStream/DataSet
d894d35 is described below

commit d894d3517258191703a0a6dd6b42947c8a0d8d95
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Jun 13 13:32:31 2019 +0200

    [FLINK-12799][table] Improve expression based TableSchema extraction from 
DataStream/DataSet
---
 .../flink/table/typeutils/FieldInfoUtils.java      | 593 ++++++++++++++-------
 .../apache/flink/table/api/BatchTableEnvImpl.scala |  35 +-
 .../flink/table/api/StreamTableEnvImpl.scala       | 226 +-------
 .../batch/table/JavaTableEnvironmentITCase.java    |  18 +-
 .../table/validation/CalcValidationTest.scala      |   4 +-
 .../StreamTableEnvironmentValidationTest.scala     |  38 +-
 .../api/validation/InlineTableValidationTest.scala |   4 +-
 .../TableEnvironmentValidationTest.scala           |  32 +-
 8 files changed, 474 insertions(+), 476 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
index 093881c..1804891 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
@@ -18,27 +18,35 @@
 
 package org.apache.flink.table.typeutils;
 
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.types.Row;
 
+import javax.annotation.Nullable;
+
 import java.lang.reflect.Modifier;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -46,7 +54,10 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static java.lang.String.format;
-import static 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
  * Utility methods for extracting names and indices of fields from different 
{@link TypeInformation}s.
@@ -56,15 +67,54 @@ public class FieldInfoUtils {
        private static final String ATOMIC_FIELD_NAME = "f0";
 
        /**
-        * Describes extracted fields and corresponding indices from a {@link 
TypeInformation}.
+        * Describes fields' names, indices and {@link DataType}s extracted 
from a {@link TypeInformation} and possibly
+        * transformed via {@link Expression} application. It is in fact a 
mapping between {@link TypeInformation} of an
+        * input and {@link TableSchema} of a {@link 
org.apache.flink.table.api.Table} that can be created out of it.
+        *
+        * @see FieldInfoUtils#getFieldsInfo(TypeInformation)
+        * @see FieldInfoUtils#getFieldsInfo(TypeInformation, Expression[])
         */
-       public static class FieldsInfo {
+       public static class TypeInfoSchema {
                private final String[] fieldNames;
                private final int[] indices;
-
-               FieldsInfo(String[] fieldNames, int[] indices) {
+               private final DataType[] fieldTypes;
+               private final boolean isRowtimeDefined;
+
+               TypeInfoSchema(
+                               String[] fieldNames,
+                               int[] indices,
+                               DataType[] fieldTypes,
+                               boolean isRowtimeDefined) {
+                       validateEqualLength(fieldNames, indices, fieldTypes);
+                       validateNamesUniqueness(fieldNames);
+
+                       this.isRowtimeDefined = isRowtimeDefined;
                        this.fieldNames = fieldNames;
                        this.indices = indices;
+                       this.fieldTypes = fieldTypes;
+               }
+
+               private void validateEqualLength(String[] fieldNames, int[] 
indices, DataType[] fieldTypes) {
+                       if (fieldNames.length != indices.length || 
indices.length != fieldTypes.length) {
+                               throw new 
TableException(String.format("Mismatched number of indices, names and types:\n" 
+
+                                       "Names: %s\n" +
+                                       "Indices: %s\n" +
+                                       "Types: %s", 
Arrays.toString(fieldNames), Arrays.toString(indices), 
Arrays.toString(fieldTypes)));
+                       }
+               }
+
+               private void validateNamesUniqueness(String[] fieldNames) {
+                       // check uniqueness of field names
+                       Set<String> duplicatedNames = 
findDuplicates(fieldNames);
+                       if (duplicatedNames.size() != 0) {
+
+                               throw new ValidationException(String.format(
+                                       "Field names must be unique.\n" +
+                                               "List of duplicate fields: 
[%s].\n" +
+                                               "List of all fields: [%s].",
+                                       String.join(", ", duplicatedNames),
+                                       String.join(", ", fieldNames)));
+                       }
                }
 
                public String[] getFieldNames() {
@@ -74,6 +124,18 @@ public class FieldInfoUtils {
                public int[] getIndices() {
                        return indices;
                }
+
+               public DataType[] getFieldTypes() {
+                       return fieldTypes;
+               }
+
+               public boolean isRowtimeDefined() {
+                       return isRowtimeDefined;
+               }
+
+               public TableSchema toTableSchema() {
+                       return TableSchema.builder().fields(fieldNames, 
fieldTypes).build();
+               }
        }
 
        /**
@@ -111,55 +173,116 @@ public class FieldInfoUtils {
        }
 
        /**
-        * Returns field names and field positions for a given {@link 
TypeInformation}.
+        * Returns a {@link TypeInfoSchema} for a given {@link TypeInformation}.
         *
-        * @param inputType The TypeInformation extract the field names and 
positions from.
+        * @param inputType The TypeInformation to extract the mapping from.
         * @param <A> The type of the TypeInformation.
-        * @return A tuple of two arrays holding the field names and 
corresponding field positions.
+        * @return A description of the input that enables creation of a {@link 
TableSchema}.
+        * @see TypeInfoSchema
         */
-       public static <A> FieldsInfo getFieldsInfo(TypeInformation<A> 
inputType) {
+       public static <A> TypeInfoSchema getFieldsInfo(TypeInformation<A> 
inputType) {
 
                if (inputType instanceof GenericTypeInfo && 
inputType.getTypeClass() == Row.class) {
-                       throw new TableException(
+                       throw new ValidationException(
                                "An input of GenericTypeInfo<Row> cannot be 
converted to Table. " +
                                        "Please specify the type of the input 
with a RowTypeInfo.");
                } else {
-                       return new FieldsInfo(getFieldNames(inputType), 
getFieldIndices(inputType));
+                       return new TypeInfoSchema(
+                               getFieldNames(inputType),
+                               getFieldIndices(inputType),
+                               
fromLegacyInfoToDataType(getFieldTypes(inputType)),
+                               false);
                }
        }
 
        /**
-        * Returns field names and field positions for a given {@link 
TypeInformation} and array of
-        * {@link Expression}. It does not handle time attributes but considers 
them in indices.
+        * Returns a {@link TypeInfoSchema} for a given {@link TypeInformation}.
+        * It gives control of the process of mapping {@link TypeInformation} 
to {@link TableSchema}
+        * (via {@link TypeInfoSchema}).
         *
-        * @param inputType The {@link TypeInformation} against which the 
{@link Expression}s are evaluated.
-        * @param exprs     The expressions that define the field names.
+        * <p>Possible operations via the expressions include:
+        * <ul>
+        *     <li>specifying rowtime & proctime attributes via .proctime, 
.rowtime
+        *          <ul>
+        *              <li>There can be only a single rowtime and/or a single 
proctime attribute</li>
+        *              <li>Proctime attribute can only be appended to the end 
of the expression list</li>
+        *              <li>Rowtime attribute can replace an input field if the 
input field has a compatible type.
+        *              See {@link TimestampType}.</li>
+        *          </ul>
+        *     </li>
+        *     <li>renaming fields by position (this cannot be mixed with 
referencing by name)</li>
+        *     <li>renaming & projecting fields by name (this cannot be mixed 
with referencing by position)</li>
+        * </ul>
+        *
+        * @param inputType The TypeInformation to extract the mapping from.
+        * @param expressions Expressions to apply while extracting the mapping.
         * @param <A> The type of the TypeInformation.
-        * @return A tuple of two arrays holding the field names and 
corresponding field positions.
+        * @return A description of the input that enables creation of a {@link 
TableSchema}.
+        * @see TypeInfoSchema
         */
-       public static <A> FieldsInfo getFieldsInfo(TypeInformation<A> 
inputType, Expression[] exprs) {
+       public static <A> TypeInfoSchema getFieldsInfo(TypeInformation<A> 
inputType, Expression[] expressions) {
                validateInputTypeInfo(inputType);
 
-               final Set<FieldInfo> fieldInfos;
+               final List<FieldInfo> fieldInfos = 
extractFieldInformation(inputType, expressions);
+
+               validateNoStarReference(fieldInfos);
+               boolean isRowtimeAttribute = 
checkIfRowtimeAttribute(fieldInfos);
+               validateAtMostOneProctimeAttribute(fieldInfos);
+
+               String[] fieldNames = 
fieldInfos.stream().map(FieldInfo::getFieldName).toArray(String[]::new);
+               int[] fieldIndices = 
fieldInfos.stream().mapToInt(FieldInfo::getIndex).toArray();
+               DataType[] dataTypes = 
fieldInfos.stream().map(FieldInfo::getType).toArray(DataType[]::new);
+
+               return new TypeInfoSchema(fieldNames, fieldIndices, dataTypes, 
isRowtimeAttribute);
+       }
+
+       private static void validateNoStarReference(List<FieldInfo> fieldInfos) 
{
+               if (fieldInfos.stream().anyMatch(info -> 
info.getFieldName().equals("*"))) {
+                       throw new ValidationException("Field name can not be 
'*'.");
+               }
+       }
+
+       private static <A> List<FieldInfo> extractFieldInformation(
+               TypeInformation<A> inputType,
+               Expression[] exprs) {
+               final List<FieldInfo> fieldInfos;
                if (inputType instanceof GenericTypeInfo && 
inputType.getTypeClass() == Row.class) {
-                       throw new TableException(
+                       throw new ValidationException(
                                "An input of GenericTypeInfo<Row> cannot be 
converted to Table. " +
                                        "Please specify the type of the input 
with a RowTypeInfo.");
                } else if (inputType instanceof TupleTypeInfoBase) {
-                       fieldInfos = 
extractFieldInfosFromTupleType((CompositeType) inputType, exprs);
+                       fieldInfos = 
extractFieldInfosFromTupleType((TupleTypeInfoBase<?>) inputType, exprs);
                } else if (inputType instanceof PojoTypeInfo) {
-                       fieldInfos = 
extractFieldInfosByNameReference((CompositeType) inputType, exprs);
+                       fieldInfos = 
extractFieldInfosByNameReference((CompositeType<?>) inputType, exprs);
                } else {
-                       fieldInfos = extractFieldInfoFromAtomicType(exprs);
+                       fieldInfos = extractFieldInfoFromAtomicType(inputType, 
exprs);
                }
+               return fieldInfos;
+       }
 
-               if (fieldInfos.stream().anyMatch(info -> 
info.getFieldName().equals("*"))) {
-                       throw new TableException("Field name can not be '*'.");
+       private static void validateAtMostOneProctimeAttribute(List<FieldInfo> 
fieldInfos) {
+               List<FieldInfo> proctimeAttributes = fieldInfos.stream()
+                       .filter(FieldInfoUtils::isProctimeField)
+                       .collect(Collectors.toList());
+
+               if (proctimeAttributes.size() > 1) {
+                       throw new ValidationException(
+                               "The proctime attribute can only be defined 
once in a table schema. Duplicated proctime attributes: " +
+                                       proctimeAttributes);
                }
+       }
 
-               String[] fieldNames = 
fieldInfos.stream().map(FieldInfo::getFieldName).toArray(String[]::new);
-               int[] fieldIndices = 
fieldInfos.stream().mapToInt(FieldInfo::getIndex).toArray();
-               return new FieldsInfo(fieldNames, fieldIndices);
+       private static boolean checkIfRowtimeAttribute(List<FieldInfo> 
fieldInfos) {
+               List<FieldInfo> rowtimeAttributes = fieldInfos.stream()
+                       .filter(FieldInfoUtils::isRowtimeField)
+                       .collect(Collectors.toList());
+
+               if (rowtimeAttributes.size() > 1) {
+                       throw new ValidationException(
+                               "The rowtime attribute can only be defined once 
in a table schema. Duplicated rowtime attributes: " +
+                                       rowtimeAttributes);
+               }
+               return rowtimeAttributes.size() > 0;
        }
 
        /**
@@ -180,7 +303,7 @@ public class FieldInfoUtils {
                }
 
                if (Arrays.asList(fieldNames).contains("*")) {
-                       throw new TableException("Field name can not be '*'.");
+                       throw new ValidationException("Field name can not be 
'*'.");
                }
 
                return fieldNames;
@@ -190,14 +313,14 @@ public class FieldInfoUtils {
         * Validate if class represented by the typeInfo is static and globally 
accessible.
         *
         * @param typeInfo type to check
-        * @throws TableException if type does not meet these criteria
+        * @throws ValidationException if type does not meet these criteria
         */
        public static <A> void validateInputTypeInfo(TypeInformation<A> 
typeInfo) {
                Class<A> clazz = typeInfo.getTypeClass();
                if ((clazz.isMemberClass() && 
!Modifier.isStatic(clazz.getModifiers())) ||
                        !Modifier.isPublic(clazz.getModifiers()) ||
                        clazz.getCanonicalName() == null) {
-                       throw new TableException(format(
+                       throw new ValidationException(format(
                                "Class '%s' described in type information '%s' 
must be " +
                                "static and globally accessible.", clazz, 
typeInfo));
                }
@@ -225,7 +348,7 @@ public class FieldInfoUtils {
                final TypeInformation<?>[] fieldTypes;
                if (inputType instanceof CompositeType) {
                        int arity = inputType.getArity();
-                       CompositeType ct = (CompositeType) inputType;
+                       CompositeType ct = (CompositeType<?>) inputType;
                        fieldTypes = IntStream.range(0, 
arity).mapToObj(ct::getTypeAt).toArray(TypeInformation[]::new);
                } else {
                        fieldTypes = new TypeInformation[]{inputType};
@@ -234,144 +357,62 @@ public class FieldInfoUtils {
                return fieldTypes;
        }
 
-       /**
-        * Derives {@link TableSchema} out of a {@link TypeInformation}. It is 
complementary to other
-        * methods in this class. This also performs translation from time 
indicator markers such as
-        * {@link TimeIndicatorTypeInfo#ROWTIME_STREAM_MARKER} etc. to a 
corresponding
-        * {@link TimeIndicatorTypeInfo}.
-        *
-        * @param typeInfo input type info to calculate fields type infos from
-        * @param fieldIndexes indices within the typeInfo of the resulting 
Table schema
-        * @param fieldNames names of the fields of the resulting schema
-        * @return calculates resulting schema
-        */
-       public static TableSchema calculateTableSchema(
-               TypeInformation<?> typeInfo,
-               int[] fieldIndexes,
-               String[] fieldNames) {
-
-               if (fieldIndexes.length != fieldNames.length) {
-                       throw new TableException(String.format(
-                               "Number of field names and field indexes must 
be equal.\n" +
-                                       "Number of names is %s, number of 
indexes is %s.\n" +
-                                       "List of column names: %s.\n" +
-                                       "List of column indexes: %s.",
-                               fieldNames.length,
-                               fieldIndexes.length,
-                               String.join(", ", fieldNames),
-                               
Arrays.stream(fieldIndexes).mapToObj(Integer::toString).collect(Collectors.joining(",
 "))));
-               }
-
-               // check uniqueness of field names
-               Set<String> duplicatedNames = findDuplicates(fieldNames);
-               if (duplicatedNames.size() != 0) {
-
-                       throw new TableException(String.format(
-                               "Field names must be unique.\n" +
-                                       "List of duplicate fields: [%s].\n" +
-                                       "List of all fields: [%s].",
-                               String.join(", ", duplicatedNames),
-                               String.join(", ", fieldNames)));
-               }
-
-               final TypeInformation[] types;
-               long fieldIndicesCount = Arrays.stream(fieldIndexes).filter(i 
-> i >= 0).count();
-               if (typeInfo instanceof CompositeType) {
-                       CompositeType ct = (CompositeType) typeInfo;
-                       // it is ok to leave out fields
-                       if (fieldIndicesCount > ct.getArity()) {
-                               throw new TableException(String.format(
-                                       "Arity of type (%s) must not be greater 
than number of field names %s.",
-                                       Arrays.toString(ct.getFieldNames()),
-                                       Arrays.toString(fieldNames)));
-                       }
-
-                       types = Arrays.stream(fieldIndexes)
-                               .mapToObj(idx -> 
extractTimeMarkerType(idx).orElseGet(() -> ct.getTypeAt(idx)))
-                               .toArray(TypeInformation[]::new);
-               } else {
-                       if (fieldIndicesCount > 1) {
-                               throw new TableException(
-                                       "Non-composite input type may have only 
a single field and its index must be 0.");
-                       }
-
-                       types = Arrays.stream(fieldIndexes)
-                               .mapToObj(idx -> 
extractTimeMarkerType(idx).orElse(typeInfo))
-                               .toArray(TypeInformation[]::new);
-               }
-
-               return new TableSchema(fieldNames, types);
-       }
-
        /* Utility methods */
 
-       private static Optional<TypeInformation<?>> extractTimeMarkerType(int 
idx) {
-               switch (idx) {
-                       case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER:
-                               return 
Optional.of(TimeIndicatorTypeInfo.ROWTIME_INDICATOR);
-                       case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER:
-                               return 
Optional.of(TimeIndicatorTypeInfo.PROCTIME_INDICATOR);
-                       case TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER:
-                       case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER:
-                               return Optional.of(Types.SQL_TIMESTAMP);
-                       default:
-                               return Optional.empty();
-               }
-       }
-
-       private static Set<FieldInfo> 
extractFieldInfoFromAtomicType(Expression[] exprs) {
-               boolean referenced = false;
-               FieldInfo fieldInfo = null;
-               for (Expression expr : exprs) {
+       private static List<FieldInfo> 
extractFieldInfoFromAtomicType(TypeInformation<?> atomicType, Expression[] 
exprs) {
+               List<FieldInfo> fields = new ArrayList<>(exprs.length);
+               boolean alreadyReferenced = false;
+               for (int i = 0; i < exprs.length; i++) {
+                       Expression expr = exprs[i];
                        if (expr instanceof UnresolvedReferenceExpression) {
-                               if (referenced) {
-                                       throw new TableException("Only the 
first field can reference an atomic type.");
-                               } else {
-                                       referenced = true;
-                                       fieldInfo = new 
FieldInfo(((UnresolvedReferenceExpression) expr).getName(), 0);
+                               if (alreadyReferenced) {
+                                       throw new ValidationException("Too many 
fields referenced from an atomic type.");
                                }
-                       } else if (!isTimeAttribute(expr)) { // IGNORE Time 
attributes
-                               throw new TableException("Field reference 
expression expected.");
-                       }
-               }
 
-               if (fieldInfo != null) {
-                       return Collections.singleton(fieldInfo);
+                               alreadyReferenced = true;
+                               String name = ((UnresolvedReferenceExpression) 
expr).getName();
+                               fields.add(new FieldInfo(name, i, 
fromLegacyInfoToDataType(atomicType)));
+                       } else if (isRowTimeExpression(expr)) {
+                               UnresolvedReferenceExpression reference = 
getChildAsReference(expr);
+                               fields.add(createTimeAttributeField(reference, 
TimestampKind.ROWTIME, null));
+                       } else if (isProcTimeExpression(expr)) {
+                               UnresolvedReferenceExpression reference = 
getChildAsReference(expr);
+                               fields.add(createTimeAttributeField(reference, 
TimestampKind.PROCTIME, null));
+                       } else {
+                               throw new ValidationException("Field reference 
expression expected.");
+                       }
                }
-
-               return Collections.emptySet();
-       }
-
-       private static <A> Set<FieldInfo> 
extractFieldInfosByNameReference(CompositeType inputType, Expression[] exprs) {
-               ExprToFieldInfo exprToFieldInfo = new 
ExprToFieldInfo(inputType);
-               return Arrays.stream(exprs)
-                       .map(expr -> expr.accept(exprToFieldInfo))
-                       .filter(Optional::isPresent)
-                       .map(Optional::get)
-                       .collect(Collectors.toCollection(LinkedHashSet::new));
+               return fields;
        }
 
-       private static <A> Set<FieldInfo> 
extractFieldInfosFromTupleType(CompositeType inputType, Expression[] exprs) {
-               boolean isRefByPos = isReferenceByPosition((CompositeType<?>) 
inputType, exprs);
+       private static List<FieldInfo> 
extractFieldInfosFromTupleType(TupleTypeInfoBase<?> inputType, Expression[] 
exprs) {
+               boolean isRefByPos = isReferenceByPosition(inputType, exprs);
 
                if (isRefByPos) {
                        return IntStream.range(0, exprs.length)
-                               .mapToObj(idx -> exprs[idx].accept(new 
IndexedExprToFieldInfo(idx)))
-                               .filter(Optional::isPresent)
-                               .map(Optional::get)
-                               
.collect(Collectors.toCollection(LinkedHashSet::new));
+                               .mapToObj(idx -> exprs[idx].accept(new 
IndexedExprToFieldInfo(inputType, idx)))
+                               .collect(Collectors.toList());
                } else {
                        return extractFieldInfosByNameReference(inputType, 
exprs);
                }
        }
 
+       private static List<FieldInfo> 
extractFieldInfosByNameReference(CompositeType<?> inputType, Expression[] 
exprs) {
+               ExprToFieldInfo exprToFieldInfo = new 
ExprToFieldInfo(inputType);
+               return Arrays.stream(exprs)
+                       .map(expr -> expr.accept(exprToFieldInfo))
+                       .collect(Collectors.toList());
+       }
+
        private static class FieldInfo {
                private final String fieldName;
                private final int index;
+               private final DataType type;
 
-               FieldInfo(String fieldName, int index) {
+               FieldInfo(String fieldName, int index, DataType type) {
                        this.fieldName = fieldName;
                        this.index = index;
+                       this.type = type;
                }
 
                public String getFieldName() {
@@ -381,51 +422,90 @@ public class FieldInfoUtils {
                public int getIndex() {
                        return index;
                }
+
+               public DataType getType() {
+                       return type;
+               }
        }
 
-       private static class IndexedExprToFieldInfo extends 
ApiExpressionDefaultVisitor<Optional<FieldInfo>> {
+       private static class IndexedExprToFieldInfo extends 
ApiExpressionDefaultVisitor<FieldInfo> {
 
+               private final CompositeType<?> inputType;
                private final int index;
 
-               private IndexedExprToFieldInfo(int index) {
+               private IndexedExprToFieldInfo(CompositeType<?> inputType, int 
index) {
+                       this.inputType = inputType;
                        this.index = index;
                }
 
                @Override
-               public Optional<FieldInfo> 
visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference) {
+               public FieldInfo 
visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference) {
                        String fieldName = unresolvedReference.getName();
-                       return Optional.of(new FieldInfo(fieldName, index));
+                       return new FieldInfo(fieldName, index, 
fromLegacyInfoToDataType(getTypeAt(unresolvedReference)));
                }
 
                @Override
-               public Optional<FieldInfo> visitCall(CallExpression call) {
+               public FieldInfo visitCall(CallExpression call) {
                        if (call.getFunctionDefinition() == 
BuiltInFunctionDefinitions.AS) {
-                               List<Expression> children = call.getChildren();
-                               Expression origExpr = children.get(0);
-                               String newName = 
ExpressionUtils.extractValue(children.get(1), String.class)
-                                       .orElseThrow(() ->
-                                               new TableException("Alias 
expects string literal as new name. Got: " + children.get(1)));
-
-                               if (origExpr instanceof 
UnresolvedReferenceExpression) {
-                                       throw new TableException(
-                                               format("Alias '%s' is not 
allowed if other fields are referenced by position.", newName));
-                               } else if (isTimeAttribute(origExpr)) {
-                                       return Optional.empty();
-                               }
-                       } else if (isTimeAttribute(call)) {
-                               return Optional.empty();
+                               return visitAlias(call);
+                       } else if (isRowTimeExpression(call)) {
+                               validateRowtimeReplacesCompatibleType(call);
+                               return 
createTimeAttributeField(getChildAsReference(call), TimestampKind.ROWTIME, 
null);
+                       } else if (isProcTimeExpression(call)) {
+                               validateProcTimeAttributeAppended(call);
+                               return 
createTimeAttributeField(getChildAsReference(call), TimestampKind.PROCTIME, 
null);
                        }
 
                        return defaultMethod(call);
                }
 
+               private FieldInfo visitAlias(CallExpression call) {
+                       List<Expression> children = call.getChildren();
+                       String newName = extractAlias(children.get(1));
+
+                       Expression child = children.get(0);
+                       if (isProcTimeExpression(child)) {
+                               validateProcTimeAttributeAppended(call);
+                               return 
createTimeAttributeField(getChildAsReference(child), TimestampKind.PROCTIME, 
newName);
+                       } else {
+                               throw new ValidationException(
+                                       format("Alias '%s' is not allowed if 
other fields are referenced by position.", newName));
+                       }
+               }
+
+               private void 
validateRowtimeReplacesCompatibleType(CallExpression call) {
+                       if (index < inputType.getArity()) {
+                               checkRowtimeType(getTypeAt(call));
+                       }
+               }
+
+               private void validateProcTimeAttributeAppended(CallExpression 
call) {
+                       if (index < inputType.getArity()) {
+                               throw new 
ValidationException(String.format("The proctime attribute can only be appended 
to the" +
+                                       " table schema and not replace an 
existing field. Please move '%s' to the end of the" +
+                                       " schema.", call));
+                       }
+               }
+
+               private TypeInformation<Object> getTypeAt(Expression expr) {
+                       if (index >= inputType.getArity()) {
+                               throw new ValidationException(String.format(
+                                       "Number of expressions does not match 
number of input fields.\n" +
+                                               "Available fields: %s\n" +
+                                               "Could not map: %s",
+                                       
Arrays.toString(inputType.getFieldNames()),
+                                       expr));
+                       }
+                       return inputType.getTypeAt(index);
+               }
+
                @Override
-               protected Optional<FieldInfo> defaultMethod(Expression 
expression) {
-                       throw new TableException("Field reference expression or 
alias on field expression expected.");
+               protected FieldInfo defaultMethod(Expression expression) {
+                       throw new ValidationException("Field reference 
expression or alias on field expression expected.");
                }
        }
 
-       private static class ExprToFieldInfo extends 
ApiExpressionDefaultVisitor<Optional<FieldInfo>> {
+       private static class ExprToFieldInfo extends 
ApiExpressionDefaultVisitor<FieldInfo> {
 
                private final CompositeType ct;
 
@@ -433,53 +513,136 @@ public class FieldInfoUtils {
                        this.ct = ct;
                }
 
+               private ValidationException fieldNotFound(String name) {
+                       return new ValidationException(format(
+                               "%s is not a field of type %s. Expected: %s}",
+                               name,
+                               ct,
+                               String.join(", ", ct.getFieldNames())));
+               }
+
                @Override
-               public Optional<FieldInfo> 
visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference) {
-                       String fieldName = unresolvedReference.getName();
-                       return referenceByName(fieldName, ct).map(idx -> new 
FieldInfo(fieldName, idx));
+               public FieldInfo 
visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference) {
+                       return createFieldInfo(unresolvedReference, null);
                }
 
                @Override
-               public Optional<FieldInfo> visitCall(CallExpression call) {
+               public FieldInfo visitCall(CallExpression call) {
                        if (call.getFunctionDefinition() == 
BuiltInFunctionDefinitions.AS) {
-                               List<Expression> children = call.getChildren();
-                               Expression origExpr = children.get(0);
-                               String newName = 
ExpressionUtils.extractValue(children.get(1), String.class)
-                                       .orElseThrow(() ->
-                                               new TableException("Alias 
expects string literal as new name. Got: " + children.get(1)));
-
-                               if (origExpr instanceof 
UnresolvedReferenceExpression) {
-                                       return 
referenceByName(((UnresolvedReferenceExpression) origExpr).getName(), ct)
-                                               .map(idx -> new 
FieldInfo(newName, idx));
-                               } else if (isTimeAttribute(origExpr)) {
-                                       return Optional.empty();
-                               }
-                       } else if (isTimeAttribute(call)) {
-                               return Optional.empty();
+                               return visitAlias(call);
+                       } else if (isRowTimeExpression(call)) {
+                               return createRowtimeFieldInfo(call, null);
+                       } else if (isProcTimeExpression(call)) {
+                               return createProctimeFieldInfo(call, null);
                        }
 
                        return defaultMethod(call);
                }
 
+               private FieldInfo visitAlias(CallExpression call) {
+                       List<Expression> children = call.getChildren();
+                       String newName = extractAlias(children.get(1));
+
+                       Expression child = children.get(0);
+                       if (child instanceof UnresolvedReferenceExpression) {
+                               return 
createFieldInfo((UnresolvedReferenceExpression) child, newName);
+                       } else if (isRowTimeExpression(child)) {
+                               return createRowtimeFieldInfo(child, newName);
+                       } else if (isProcTimeExpression(child)) {
+                               return createProctimeFieldInfo(child, newName);
+                       } else {
+                               return defaultMethod(call);
+                       }
+               }
+
+               private FieldInfo createFieldInfo(UnresolvedReferenceExpression 
unresolvedReference, @Nullable String alias) {
+                       String fieldName = unresolvedReference.getName();
+                       return referenceByName(fieldName, ct)
+                               .map(idx -> new FieldInfo(
+                                       alias != null ? alias : fieldName,
+                                       idx,
+                                       
fromLegacyInfoToDataType(ct.getTypeAt(idx))))
+                               .orElseThrow(() -> fieldNotFound(fieldName));
+               }
+
+               private FieldInfo createProctimeFieldInfo(Expression 
expression, @Nullable String alias) {
+                       UnresolvedReferenceExpression reference = 
getChildAsReference(expression);
+                       String originalName = reference.getName();
+                       validateProctimeDoesNotReplaceField(originalName);
+
+                       return createTimeAttributeField(reference, 
TimestampKind.PROCTIME, alias);
+               }
+
+               private void validateProctimeDoesNotReplaceField(String 
originalName) {
+                       if (referenceByName(originalName, ct).isPresent()) {
+                               throw new ValidationException(String.format(
+                                       "The proctime attribute '%s' must not 
replace an existing field.",
+                                       originalName));
+                       }
+               }
+
+               private FieldInfo createRowtimeFieldInfo(Expression expression, 
@Nullable String alias) {
+                       UnresolvedReferenceExpression reference = 
getChildAsReference(expression);
+                       String originalName = reference.getName();
+                       verifyReferencesValidField(originalName, alias);
+
+                       return createTimeAttributeField(reference, 
TimestampKind.ROWTIME, alias);
+               }
+
+               private void verifyReferencesValidField(String origName, 
@Nullable String alias) {
+                       Optional<Integer> refId = referenceByName(origName, ct);
+                       if (refId.isPresent()) {
+                               checkRowtimeType(ct.getTypeAt(refId.get()));
+                       } else if (alias != null) {
+                               throw new 
ValidationException(String.format("Alias '%s' must reference an existing 
field.", alias));
+                       }
+               }
+
                @Override
-               protected Optional<FieldInfo> defaultMethod(Expression 
expression) {
-                       throw new TableException("Field reference expression or 
alias on field expression expected.");
+               protected FieldInfo defaultMethod(Expression expression) {
+                       throw new ValidationException("Field reference 
expression or alias on field expression expected.");
                }
        }
 
-       private static boolean isTimeAttribute(Expression origExpr) {
+       private static String extractAlias(Expression aliasExpr) {
+               return ExpressionUtils.extractValue(aliasExpr, String.class)
+                       .orElseThrow(() -> new TableException("Alias expects 
string literal as new name. Got: " + aliasExpr));
+       }
+
+       private static void checkRowtimeType(TypeInformation<?> type) {
+               if (!(type.equals(Types.LONG()) || type instanceof 
SqlTimeTypeInfo)) {
+                       throw new ValidationException(
+                               "The rowtime attribute can only replace a field 
with a valid time type, " +
+                                       "such as Timestamp or Long. But was: " 
+ type);
+               }
+       }
+
+       private static boolean isRowtimeField(FieldInfo field) {
+               DataType type = field.getType();
+               return hasRoot(type.getLogicalType(), 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) &&
+                       isRowtimeAttribute(type.getLogicalType());
+       }
+
+       private static boolean isProctimeField(FieldInfo field) {
+               DataType type = field.getType();
+               return hasRoot(type.getLogicalType(), 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) &&
+                       isProctimeAttribute(type.getLogicalType());
+       }
+
+       private static boolean isRowTimeExpression(Expression origExpr) {
+               return origExpr instanceof CallExpression &&
+                       ((CallExpression) origExpr).getFunctionDefinition() == 
BuiltInFunctionDefinitions.ROWTIME;
+       }
+
+       private static boolean isProcTimeExpression(Expression origExpr) {
                return origExpr instanceof CallExpression &&
-                       TIME_ATTRIBUTES.contains(((CallExpression) 
origExpr).getFunctionDefinition());
+                       ((CallExpression) origExpr).getFunctionDefinition() == 
BuiltInFunctionDefinitions.PROCTIME;
        }
 
        private static Optional<Integer> referenceByName(String name, 
CompositeType<?> ct) {
                int inputIdx = ct.getFieldIndex(name);
                if (inputIdx < 0) {
-                       throw new TableException(format(
-                               "%s is not a field of type %s. Expected: %s}",
-                               name,
-                               ct,
-                               String.join(", ", ct.getFieldNames())));
+                       return Optional.empty();
                } else {
                        return Optional.of(inputIdx);
                }
@@ -500,6 +663,38 @@ public class FieldInfoUtils {
                return duplicates;
        }
 
+       private static FieldInfo createTimeAttributeField(
+                       UnresolvedReferenceExpression reference,
+                       TimestampKind kind,
+                       @Nullable String alias) {
+               final int idx;
+               if (kind == TimestampKind.PROCTIME) {
+                       idx = TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER;
+               } else {
+                       idx = TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER;
+               }
+
+               String originalName = reference.getName();
+               return new FieldInfo(
+                       alias != null ? alias : originalName,
+                       idx,
+                       createTimeIndicatorType(kind));
+       }
+
+       private static UnresolvedReferenceExpression 
getChildAsReference(Expression expression) {
+               Expression child = expression.getChildren().get(0);
+               if (child instanceof UnresolvedReferenceExpression) {
+                       return (UnresolvedReferenceExpression) child;
+               }
+
+               throw new ValidationException("Field reference expression 
expected.");
+       }
+
+       private static DataType createTimeIndicatorType(TimestampKind kind) {
+               return new AtomicDataType(new TimestampType(true, kind, 3))
+                       .bridgedTo(java.sql.Timestamp.class);
+       }
+
        private FieldInfoUtils() {
        }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
index 7c7eefe..17745ec 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.catalog.CatalogManager
 import org.apache.flink.table.descriptors.{BatchTableDescriptor, 
ConnectorDescriptor}
 import org.apache.flink.table.explain.PlanJsonParser
 import 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
-import org.apache.flink.table.expressions.{CallExpression, Expression}
+import org.apache.flink.table.expressions.{CallExpression, Expression, 
ExpressionDefaultVisitor}
 import org.apache.flink.table.operations.DataSetQueryOperation
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
@@ -42,10 +42,11 @@ import org.apache.flink.table.runtime.MapRunner
 import org.apache.flink.table.sinks._
 import org.apache.flink.table.sources.{BatchTableSource, 
InputFormatTableSource, TableSource, TableSourceUtil}
 import 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
-import org.apache.flink.table.typeutils.FieldInfoUtils.{calculateTableSchema, 
getFieldsInfo, validateInputTypeInfo}
+import org.apache.flink.table.typeutils.FieldInfoUtils.{getFieldsInfo, 
validateInputTypeInfo}
 import org.apache.flink.table.utils.TableConnectorUtils
 import org.apache.flink.types.Row
 
+import _root_.scala.collection.JavaConverters._
 /**
   * The abstract base class for the implementation of batch TableEnvironments.
   *
@@ -206,23 +207,35 @@ abstract class BatchTableEnvImpl(
 
     val fieldsInfo = fields match {
       case Some(f) =>
-        if (f.exists(f =>
-          f.isInstanceOf[CallExpression] &&
-            
TIME_ATTRIBUTES.contains(f.asInstanceOf[CallExpression].getFunctionDefinition)))
 {
-          throw new ValidationException(
-            ".rowtime and .proctime time indicators are not allowed in a batch 
environment.")
-        }
-
+        checkNoTimeAttributes(f)
         getFieldsInfo[T](inputType, f)
+
       case None => getFieldsInfo[T](inputType)
     }
 
-    val tableOperation = new DataSetQueryOperation[T](dataSet,
+    val tableOperation = new DataSetQueryOperation[T](
+      dataSet,
       fieldsInfo.getIndices,
-      calculateTableSchema(inputType, fieldsInfo.getIndices, 
fieldsInfo.getFieldNames))
+      fieldsInfo.toTableSchema)
     tableOperation
   }
 
+  private def checkNoTimeAttributes[T](f: Array[Expression]) = {
+    if (f.exists(f =>
+      f.accept(new ExpressionDefaultVisitor[Boolean] {
+
+        override def visitCall(call: CallExpression): Boolean = {
+          TIME_ATTRIBUTES.contains(call.getFunctionDefinition) ||
+            call.getChildren.asScala.exists(_.accept(this))
+        }
+
+        override protected def defaultMethod(expression: Expression): Boolean 
= false
+      }))) {
+      throw new ValidationException(
+        ".rowtime and .proctime time indicators are not allowed in a batch 
environment.")
+    }
+  }
+
   /**
     * Returns the built-in normalization rules that are defined by the 
environment.
     */
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
index c351a46..49103f7 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
@@ -28,7 +28,6 @@ import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
@@ -52,8 +51,7 @@ import org.apache.flink.table.runtime.{CRowMapRunner, 
OutputRowtimeProcessFuncti
 import org.apache.flink.table.sinks._
 import org.apache.flink.table.sources.{StreamTableSource, TableSource, 
TableSourceUtil}
 import 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
-import org.apache.flink.table.typeutils.FieldInfoUtils.{calculateTableSchema, 
getFieldsInfo, isReferenceByPosition}
-import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils}
+import org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo
 
 import _root_.scala.collection.JavaConverters._
 
@@ -325,240 +323,32 @@ abstract class StreamTableEnvImpl(
     val streamType = dataStream.getType
 
     // get field names and types for all non-replaced fields
-    val (indices, names) = fields match {
+    val fieldsInfo = fields match {
       case Some(f) =>
         // validate and extract time attributes
         val fieldsInfo = getFieldsInfo[T](streamType, f)
-        val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, 
f)
 
         // check if event-time is enabled
-        if (rowtime.isDefined &&
+        if (fieldsInfo.isRowtimeDefined &&
           execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime) 
{
-          throw new TableException(
+          throw new ValidationException(
             s"A rowtime attribute requires an EventTime time characteristic in 
stream environment" +
               s". But is: ${execEnv.getStreamTimeCharacteristic}")
         }
 
-        // adjust field indexes and field names
-        val indexesWithIndicatorFields = adjustFieldIndexes(
-          fieldsInfo.getIndices,
-          rowtime,
-          proctime)
-        val namesWithIndicatorFields = adjustFieldNames(
-          fieldsInfo.getFieldNames,
-          rowtime,
-          proctime)
-
-        (indexesWithIndicatorFields, namesWithIndicatorFields)
+        fieldsInfo
       case None =>
-        val fieldsInfo = getFieldsInfo[T](streamType)
-        (fieldsInfo.getIndices, fieldsInfo.getFieldNames)
+        getFieldsInfo[T](streamType)
     }
 
     val dataStreamTable = new DataStreamQueryOperation(
       dataStream,
-      indices,
-      calculateTableSchema(streamType, indices, names))
+      fieldsInfo.getIndices,
+      fieldsInfo.toTableSchema)
     dataStreamTable
   }
 
   /**
-    * Checks for at most one rowtime and proctime attribute.
-    * Returns the time attributes.
-    *
-    * @return rowtime attribute and proctime attribute
-    */
-  private def validateAndExtractTimeAttributes(
-    streamType: TypeInformation[_],
-    exprs: Array[Expression])
-  : (Option[(Int, String)], Option[(Int, String)]) = {
-
-    val (isRefByPos, fieldTypes) = streamType match {
-      case c: CompositeType[_] =>
-        // determine schema definition mode (by position or by name)
-        (isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => 
c.getTypeAt(i)).toArray)
-      case t: TypeInformation[_] =>
-        (false, Array(t))
-    }
-
-    var fieldNames: List[String] = Nil
-    var rowtime: Option[(Int, String)] = None
-    var proctime: Option[(Int, String)] = None
-
-    def checkRowtimeType(t: TypeInformation[_]): Unit = {
-      if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
-        throw new TableException(
-          s"The rowtime attribute can only replace a field with a valid time 
type, " +
-          s"such as Timestamp or Long. But was: $t")
-      }
-    }
-
-    def extractRowtime(idx: Int, name: String, origName: Option[String]): Unit 
= {
-      if (rowtime.isDefined) {
-        throw new TableException(
-          "The rowtime attribute can only be defined once in a table schema.")
-      } else {
-        // if the fields are referenced by position,
-        // it is possible to replace an existing field or append the time 
attribute at the end
-        if (isRefByPos) {
-          // aliases are not permitted
-          if (origName.isDefined) {
-            throw new TableException(
-              s"Invalid alias '${origName.get}' because fields are referenced 
by position.")
-          }
-          // check type of field that is replaced
-          if (idx < fieldTypes.length) {
-            checkRowtimeType(fieldTypes(idx))
-          }
-        }
-        // check reference-by-name
-        else {
-          val aliasOrName = origName.getOrElse(name)
-          streamType match {
-            // both alias and reference must have a valid type if they replace 
a field
-            case ct: CompositeType[_] if ct.hasField(aliasOrName) =>
-              val t = ct.getTypeAt(ct.getFieldIndex(aliasOrName))
-              checkRowtimeType(t)
-            // alias could not be found
-            case _ if origName.isDefined =>
-              throw new TableException(s"Alias '${origName.get}' must 
reference an existing field.")
-            case _ => // ok
-          }
-        }
-
-        rowtime = Some(idx, name)
-      }
-    }
-
-    def extractProctime(idx: Int, name: String): Unit = {
-      if (proctime.isDefined) {
-          throw new TableException(
-            "The proctime attribute can only be defined once in a table 
schema.")
-      } else {
-        // if the fields are referenced by position,
-        // it is only possible to append the time attribute at the end
-        if (isRefByPos) {
-
-          // check that proctime is only appended
-          if (idx < fieldTypes.length) {
-            throw new TableException(
-              "The proctime attribute can only be appended to the table schema 
and not replace " +
-                s"an existing field. Please move '$name' to the end of the 
schema.")
-          }
-        }
-        // check reference-by-name
-        else {
-          streamType match {
-            // proctime attribute must not replace a field
-            case ct: CompositeType[_] if ct.hasField(name) =>
-              throw new TableException(
-                s"The proctime attribute '$name' must not replace an existing 
field.")
-            case _ => // ok
-          }
-        }
-        proctime = Some(idx, name)
-      }
-    }
-
-    val bridgedFields = exprs.map(expressionBridge.bridge).toArray[Expression]
-    bridgedFields.zipWithIndex.foreach {
-      case (RowtimeAttribute(UnresolvedFieldReference(name)), idx) =>
-        extractRowtime(idx, name, None)
-
-      case (Alias(RowtimeAttribute(UnresolvedFieldReference(origName)), name, 
_), idx) =>
-        extractRowtime(idx, name, Some(origName))
-
-      case (ProctimeAttribute(UnresolvedFieldReference(name)), idx) =>
-        extractProctime(idx, name)
-
-      case (Alias(ProctimeAttribute(UnresolvedFieldReference(_)), name, _), 
idx) =>
-        extractProctime(idx, name)
-
-      case (UnresolvedFieldReference(name), _) => fieldNames = name :: 
fieldNames
-
-      case (Alias(UnresolvedFieldReference(_), name, _), _) => fieldNames = 
name :: fieldNames
-
-      case (e, _) =>
-        throw new TableException(s"Time attributes can only be defined on 
field references. " +
-          s"Rowtime attributes can replace existing fields, proctime 
attributes can not. " +
-          s"But was: $e")
-    }
-
-    if (rowtime.isDefined && fieldNames.contains(rowtime.get._2)) {
-      throw new TableException(
-        "The rowtime attribute may not have the same name as an another 
field.")
-    }
-
-    if (proctime.isDefined && fieldNames.contains(proctime.get._2)) {
-      throw new TableException(
-        "The proctime attribute may not have the same name as an another 
field.")
-    }
-
-    (rowtime, proctime)
-  }
-
-  /**
-    * Injects markers for time indicator fields into the field indexes.
-    *
-    * @param fieldIndexes The field indexes into which the time indicators 
markers are injected.
-    * @param rowtime An optional rowtime indicator
-    * @param proctime An optional proctime indicator
-    * @return An adjusted array of field indexes.
-    */
-  private def adjustFieldIndexes(
-    fieldIndexes: Array[Int],
-    rowtime: Option[(Int, String)],
-    proctime: Option[(Int, String)]): Array[Int] = {
-
-    // inject rowtime field
-    val withRowtime = rowtime match {
-      case Some(rt) =>
-        fieldIndexes.patch(rt._1, 
Seq(TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER), 0)
-      case _ =>
-        fieldIndexes
-    }
-
-    // inject proctime field
-    val withProctime = proctime match {
-      case Some(pt) =>
-        withRowtime.patch(pt._1, 
Seq(TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER), 0)
-      case _ =>
-        withRowtime
-    }
-
-    withProctime
-  }
-
-  /**
-    * Injects names of time indicator fields into the list of field names.
-    *
-    * @param fieldNames The array of field names into which the time indicator 
field names are
-    *                   injected.
-    * @param rowtime An optional rowtime indicator
-    * @param proctime An optional proctime indicator
-    * @return An adjusted array of field names.
-    */
-  private def adjustFieldNames(
-    fieldNames: Array[String],
-    rowtime: Option[(Int, String)],
-    proctime: Option[(Int, String)]): Array[String] = {
-
-    // inject rowtime field
-    val withRowtime = rowtime match {
-      case Some(rt) => fieldNames.patch(rt._1, Seq(rowtime.get._2), 0)
-      case _ => fieldNames
-    }
-
-    // inject proctime field
-    val withProctime = proctime match {
-      case Some(pt) => withRowtime.patch(pt._1, Seq(proctime.get._2), 0)
-      case _ => withRowtime
-    }
-
-    withProctime
-  }
-
-  /**
     * Returns the decoration rule set for this environment
     * including a custom RuleSet configuration.
     */
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
index ffe40f6..6f35d96 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
@@ -470,7 +470,7 @@ public class JavaTableEnvironmentITCase extends 
TableProgramsCollectionTestBase
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = TableException.class)
+       @Test(expected = ValidationException.class)
        public void testGenericRow() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, config());
@@ -484,7 +484,7 @@ public class JavaTableEnvironmentITCase extends 
TableProgramsCollectionTestBase
                tableEnv.fromDataSet(dataSet);
        }
 
-       @Test(expected = TableException.class)
+       @Test(expected = ValidationException.class)
        public void testGenericRowWithAlias() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, config());
@@ -498,8 +498,8 @@ public class JavaTableEnvironmentITCase extends 
TableProgramsCollectionTestBase
                tableEnv.fromDataSet(dataSet, "nullField");
        }
 
-       @Test(expected = TableException.class)
-       public void testAsWithToManyFields() throws Exception {
+       @Test(expected = ValidationException.class)
+       public void testAsWithTooManyFields() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, config());
 
@@ -507,7 +507,7 @@ public class JavaTableEnvironmentITCase extends 
TableProgramsCollectionTestBase
                tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), 
"a, b, c, d");
        }
 
-       @Test(expected = TableException.class)
+       @Test(expected = ValidationException.class)
        public void testAsWithAmbiguousFields() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, config());
@@ -516,7 +516,7 @@ public class JavaTableEnvironmentITCase extends 
TableProgramsCollectionTestBase
                tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), 
"a, b, b");
        }
 
-       @Test(expected = TableException.class)
+       @Test(expected = ValidationException.class)
        public void testAsWithNonFieldReference1() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, config());
@@ -525,7 +525,7 @@ public class JavaTableEnvironmentITCase extends 
TableProgramsCollectionTestBase
                tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), 
"a + 1, b, c");
        }
 
-       @Test(expected = TableException.class)
+       @Test(expected = ValidationException.class)
        public void testAsWithNonFieldReference2() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, config());
@@ -534,7 +534,7 @@ public class JavaTableEnvironmentITCase extends 
TableProgramsCollectionTestBase
                tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), 
"a as foo, b,  c");
        }
 
-       @Test(expected = TableException.class)
+       @Test(expected = ValidationException.class)
        public void testNonStaticClassInput() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, config());
@@ -543,7 +543,7 @@ public class JavaTableEnvironmentITCase extends 
TableProgramsCollectionTestBase
                tableEnv.fromDataSet(env.fromElements(new MyNonStatic()), 
"name");
        }
 
-       @Test(expected = TableException.class)
+       @Test(expected = ValidationException.class)
        public void testNonStaticClassOutput() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, config());
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala
index 9783d1a..5ee31da 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.table.api.batch.table.validation
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert._
@@ -98,7 +98,7 @@ class CalcValidationTest extends TableTestBase {
       util.addTable[(Int, Long, String)]("Table1", '*, 'b, 'c)
       fail("TableException expected")
     } catch {
-      case _: TableException => //ignore
+      case _: ValidationException => //ignore
     }
 
     try {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
index a6be848..f9dccf1 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
@@ -34,7 +34,7 @@ class StreamTableEnvironmentValidationTest extends 
TableTestBase {
   // schema definition by position
   // 
----------------------------------------------------------------------------------------------
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testInvalidRowtimeAliasByPosition(): Unit = {
     val util = streamTestUtil()
     // don't allow aliasing by position
@@ -62,69 +62,69 @@ class StreamTableEnvironmentValidationTest extends 
TableTestBase {
     util.addTable[(Long, Int, String, Int, Long)]('a.rowtime.rowtime, 'b, 'c, 
'd, 'e)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testInvalidProctimeAttributeByPosition(): Unit = {
     val util = streamTestUtil()
     // cannot replace an attribute with proctime
     util.addTable[(Long, Int, String, Int, Long)]('a, 'b.proctime, 'c, 'd, 'e)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testRowtimeAttributeReplaceFieldOfInvalidTypeByPosition(): Unit = {
     val util = streamTestUtil()
     // cannot replace a non-time attribute with rowtime
     util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c.rowtime, 'd, 'e)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testRowtimeAndInvalidProctimeAttributeByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 
'pt.proctime)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testOnlyOneRowtimeAttribute1ByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Long, Int, String, Int, Long)]('a.rowtime, 'b, 'c, 'd, 'e, 
'rt.rowtime)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testOnlyOneProctimeAttribute1ByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 
'pt1.proctime, 'pt2.proctime)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testRowtimeAttributeUsedNameByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 
'a.rowtime)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testProctimeAttributeUsedNameByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 
'b.proctime)
   }
 
-  @Test(expected = classOf[TableException])
-  def testAsWithToManyFieldsByPosition(): Unit = {
+  @Test(expected = classOf[ValidationException])
+  def testAsWithTooManyFieldsByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Int, Long, String)]('a, 'b, 'c, 'd)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testAsWithAmbiguousFieldsByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Int, Long, String)]('a, 'b, 'b)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testOnlyFieldRefInAsByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Int, Long, String)]('a, 'b as 'c, 'd)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testInvalidTimeCharacteristicByPosition(): Unit = {
     val data = List((1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"))
     val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -139,21 +139,21 @@ class StreamTableEnvironmentValidationTest extends 
TableTestBase {
   // schema definition by name
   // 
----------------------------------------------------------------------------------------------
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testInvalidAliasByName(): Unit = {
     val util = streamTestUtil()
     // we reference by name, but the field does not exist
     util.addTable[(Long, Int, String, Int, Long)]('x as 'r)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testInvalidFieldByName(): Unit = {
     val util = streamTestUtil()
     // we reference by name, but the field does not exist
     util.addTable[(Long, Int, String, Int, Long)]('x as 'r)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testInvalidField2ByName(): Unit = {
     val util = streamTestUtil()
     // we mix reference by position and by name
@@ -167,21 +167,21 @@ class StreamTableEnvironmentValidationTest extends 
TableTestBase {
     util.addTable[(Int, Long, String)]('_1, ('_2 as 'new).proctime, '_3)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testInvalidReplacingProctimeAttribute(): Unit = {
     val util = streamTestUtil()
     // proctime must not replace an existing field
     util.addTable[(Int, Long, String)]('_1, '_2.proctime, '_3)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testInvalidAliasWithRowtimeAttribute(): Unit = {
     val util = streamTestUtil()
     // aliased field does not exist
     util.addTable[(Int, Long, String)]('_1, 'newnew.rowtime as 'new, '_3)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testInvalidAliasWithRowtimeAttribute2(): Unit = {
     val util = streamTestUtil()
     // aliased field has wrong type
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala
index 92cae5c..a28c0f6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.api.validation
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.utils.TableTestBase
 import org.junit.Test
@@ -28,7 +28,7 @@ class InlineTableValidationTest extends TableTestBase {
   @Test
   def testFieldNamesDuplicate() {
 
-    thrown.expect(classOf[TableException])
+    thrown.expect(classOf[ValidationException])
     thrown.expectMessage("Field names must be unique.\n" +
       "List of duplicate fields: [a].\n" +
       "List of all fields: [a, a, b].")
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
index 34c9544..1b2c2c6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.table.api.TableEnvironmentTest.{CClass, PojoClass}
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException
 import org.apache.flink.table.runtime.types.CRowTypeInfo
 import org.apache.flink.table.utils.TableTestBase
@@ -55,56 +55,56 @@ class TableEnvironmentValidationTest extends TableTestBase {
 
   val genericRowType = new GenericTypeInfo[Row](classOf[Row])
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testInvalidAliasInRefByPosMode(): Unit = {
     val util = batchTestUtil()
     // all references must happen position-based
     util.addTable('a, 'b, 'f2 as 'c)(tupleType)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testInvalidAliasOnAtomicType(): Unit = {
     val util = batchTestUtil()
     // alias not allowed
     util.addTable('g as 'c)(atomicType)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testGetFieldInfoPojoNames1(): Unit = {
     val util = batchTestUtil()
     // duplicate name
     util.addTable('name1, 'name1, 'name3)(pojoType)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testGetFieldInfoAtomicName2(): Unit = {
     val util = batchTestUtil()
     // must be only one name
     util.addTable('name1, 'name2)(atomicType)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testGetFieldInfoTupleAlias3(): Unit = {
     val util = batchTestUtil()
     // fields do not exist
     util.addTable('xxx as 'name1, 'yyy as 'name2, 'zzz as 'name3)(tupleType)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testGetFieldInfoCClassAlias3(): Unit = {
     val util = batchTestUtil()
     // fields do not exist
     util.addTable('xxx as 'name1, 'yyy as 'name2, 'zzz as 
'name3)(caseClassType)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testGetFieldInfoPojoAlias3(): Unit = {
     val util = batchTestUtil()
     // fields do not exist
     util.addTable('xxx as 'name1, 'yyy as 'name2, 'zzz as 'name3)(pojoType)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testGetFieldInfoGenericRowAlias(): Unit = {
     val util = batchTestUtil()
     // unsupported generic row type
@@ -154,8 +154,8 @@ class TableEnvironmentValidationTest extends TableTestBase {
     tEnv2.registerTable("MyTable", t1)
   }
 
-  @Test(expected = classOf[TableException])
-  def testToTableWithToManyFields(): Unit = {
+  @Test(expected = classOf[ValidationException])
+  def testToTableWithTooManyFields(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = BatchTableEnvironment.create(env)
 
@@ -164,7 +164,7 @@ class TableEnvironmentValidationTest extends TableTestBase {
       .toTable(tEnv, 'a, 'b, 'c, 'd)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testToTableWithAmbiguousFields(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = BatchTableEnvironment.create(env)
@@ -174,7 +174,7 @@ class TableEnvironmentValidationTest extends TableTestBase {
       .toTable(tEnv, 'a, 'b, 'b)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testToTableWithNonFieldReference1(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = BatchTableEnvironment.create(env)
@@ -184,7 +184,7 @@ class TableEnvironmentValidationTest extends TableTestBase {
       .toTable(tEnv, 'a + 1, 'b, 'c)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testToTableWithNonFieldReference2(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = BatchTableEnvironment.create(env)
@@ -194,7 +194,7 @@ class TableEnvironmentValidationTest extends TableTestBase {
       .toTable(tEnv, 'a as 'foo, 'b, 'c)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testGenericRow() {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tableEnv = BatchTableEnvironment.create(env)
@@ -208,7 +208,7 @@ class TableEnvironmentValidationTest extends TableTestBase {
     tableEnv.fromDataSet(dataSet)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testGenericRowWithAlias() {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tableEnv = BatchTableEnvironment.create(env)

Reply via email to