This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d894d35 [FLINK-12799][table] Improve expression based TableSchema
extraction from DataStream/DataSet
d894d35 is described below
commit d894d3517258191703a0a6dd6b42947c8a0d8d95
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Jun 13 13:32:31 2019 +0200
[FLINK-12799][table] Improve expression based TableSchema extraction from
DataStream/DataSet
---
.../flink/table/typeutils/FieldInfoUtils.java | 593 ++++++++++++++-------
.../apache/flink/table/api/BatchTableEnvImpl.scala | 35 +-
.../flink/table/api/StreamTableEnvImpl.scala | 226 +-------
.../batch/table/JavaTableEnvironmentITCase.java | 18 +-
.../table/validation/CalcValidationTest.scala | 4 +-
.../StreamTableEnvironmentValidationTest.scala | 38 +-
.../api/validation/InlineTableValidationTest.scala | 4 +-
.../TableEnvironmentValidationTest.scala | 32 +-
8 files changed, 474 insertions(+), 476 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
index 093881c..1804891 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
@@ -18,27 +18,35 @@
package org.apache.flink.table.typeutils;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionUtils;
import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.types.Row;
+import javax.annotation.Nullable;
+
import java.lang.reflect.Modifier;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -46,7 +54,10 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.lang.String.format;
-import static
org.apache.flink.table.expressions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES;
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
+import static
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
/**
* Utility methods for extracting names and indices of fields from different
{@link TypeInformation}s.
@@ -56,15 +67,54 @@ public class FieldInfoUtils {
private static final String ATOMIC_FIELD_NAME = "f0";
/**
- * Describes extracted fields and corresponding indices from a {@link
TypeInformation}.
+ * Describes fields' names, indices and {@link DataType}s extracted
from a {@link TypeInformation} and possibly
+ * transformed via {@link Expression} application. It is in fact a
mapping between {@link TypeInformation} of an
+ * input and {@link TableSchema} of a {@link
org.apache.flink.table.api.Table} that can be created out of it.
+ *
+ * @see FieldInfoUtils#getFieldsInfo(TypeInformation)
+ * @see FieldInfoUtils#getFieldsInfo(TypeInformation, Expression[])
*/
- public static class FieldsInfo {
+ public static class TypeInfoSchema {
private final String[] fieldNames;
private final int[] indices;
-
- FieldsInfo(String[] fieldNames, int[] indices) {
+ private final DataType[] fieldTypes;
+ private final boolean isRowtimeDefined;
+
+ TypeInfoSchema(
+ String[] fieldNames,
+ int[] indices,
+ DataType[] fieldTypes,
+ boolean isRowtimeDefined) {
+ validateEqualLength(fieldNames, indices, fieldTypes);
+ validateNamesUniqueness(fieldNames);
+
+ this.isRowtimeDefined = isRowtimeDefined;
this.fieldNames = fieldNames;
this.indices = indices;
+ this.fieldTypes = fieldTypes;
+ }
+
+ private void validateEqualLength(String[] fieldNames, int[]
indices, DataType[] fieldTypes) {
+ if (fieldNames.length != indices.length ||
indices.length != fieldTypes.length) {
+ throw new
TableException(String.format("Mismatched number of indices, names and types:\n"
+
+ "Names: %s\n" +
+ "Indices: %s\n" +
+ "Types: %s",
Arrays.toString(fieldNames), Arrays.toString(indices),
Arrays.toString(fieldTypes)));
+ }
+ }
+
+ private void validateNamesUniqueness(String[] fieldNames) {
+ // check uniqueness of field names
+ Set<String> duplicatedNames =
findDuplicates(fieldNames);
+ if (duplicatedNames.size() != 0) {
+
+ throw new ValidationException(String.format(
+ "Field names must be unique.\n" +
+ "List of duplicate fields:
[%s].\n" +
+ "List of all fields: [%s].",
+ String.join(", ", duplicatedNames),
+ String.join(", ", fieldNames)));
+ }
}
public String[] getFieldNames() {
@@ -74,6 +124,18 @@ public class FieldInfoUtils {
public int[] getIndices() {
return indices;
}
+
+ public DataType[] getFieldTypes() {
+ return fieldTypes;
+ }
+
+ public boolean isRowtimeDefined() {
+ return isRowtimeDefined;
+ }
+
+ public TableSchema toTableSchema() {
+ return TableSchema.builder().fields(fieldNames,
fieldTypes).build();
+ }
}
/**
@@ -111,55 +173,116 @@ public class FieldInfoUtils {
}
/**
- * Returns field names and field positions for a given {@link
TypeInformation}.
+ * Returns a {@link TypeInfoSchema} for a given {@link TypeInformation}.
*
- * @param inputType The TypeInformation extract the field names and
positions from.
+ * @param inputType The TypeInformation to extract the mapping from.
* @param <A> The type of the TypeInformation.
- * @return A tuple of two arrays holding the field names and
corresponding field positions.
+ * @return A description of the input that enables creation of a {@link
TableSchema}.
+ * @see TypeInfoSchema
*/
- public static <A> FieldsInfo getFieldsInfo(TypeInformation<A>
inputType) {
+ public static <A> TypeInfoSchema getFieldsInfo(TypeInformation<A>
inputType) {
if (inputType instanceof GenericTypeInfo &&
inputType.getTypeClass() == Row.class) {
- throw new TableException(
+ throw new ValidationException(
"An input of GenericTypeInfo<Row> cannot be
converted to Table. " +
"Please specify the type of the input
with a RowTypeInfo.");
} else {
- return new FieldsInfo(getFieldNames(inputType),
getFieldIndices(inputType));
+ return new TypeInfoSchema(
+ getFieldNames(inputType),
+ getFieldIndices(inputType),
+
fromLegacyInfoToDataType(getFieldTypes(inputType)),
+ false);
}
}
/**
- * Returns field names and field positions for a given {@link
TypeInformation} and array of
- * {@link Expression}. It does not handle time attributes but considers
them in indices.
+ * Returns a {@link TypeInfoSchema} for a given {@link TypeInformation}.
+ * It gives control of the process of mapping {@link TypeInformation}
to {@link TableSchema}
+ * (via {@link TypeInfoSchema}).
*
- * @param inputType The {@link TypeInformation} against which the
{@link Expression}s are evaluated.
- * @param exprs The expressions that define the field names.
+ * <p>Possible operations via the expressions include:
+ * <ul>
+ * <li>specifying rowtime & proctime attributes via .proctime,
.rowtime
+ * <ul>
+ * <li>There can be only a single rowtime and/or a single
proctime attribute</li>
+ * <li>Proctime attribute can only be appended to the end
of the expression list</li>
+ * <li>Rowtime attribute can replace an input field if the
input field has a compatible type.
+ * See {@link TimestampType}.</li>
+ * </ul>
+ * </li>
+ * <li>renaming fields by position (this cannot be mixed with
referencing by name)</li>
+ * <li>renaming & projecting fields by name (this cannot be mixed
with referencing by position)</li>
+ * </ul>
+ *
+ * @param inputType The TypeInformation to extract the mapping from.
+ * @param expressions Expressions to apply while extracting the mapping.
* @param <A> The type of the TypeInformation.
- * @return A tuple of two arrays holding the field names and
corresponding field positions.
+ * @return A description of the input that enables creation of a {@link
TableSchema}.
+ * @see TypeInfoSchema
*/
- public static <A> FieldsInfo getFieldsInfo(TypeInformation<A>
inputType, Expression[] exprs) {
+ public static <A> TypeInfoSchema getFieldsInfo(TypeInformation<A>
inputType, Expression[] expressions) {
validateInputTypeInfo(inputType);
- final Set<FieldInfo> fieldInfos;
+ final List<FieldInfo> fieldInfos =
extractFieldInformation(inputType, expressions);
+
+ validateNoStarReference(fieldInfos);
+ boolean isRowtimeAttribute =
checkIfRowtimeAttribute(fieldInfos);
+ validateAtMostOneProctimeAttribute(fieldInfos);
+
+ String[] fieldNames =
fieldInfos.stream().map(FieldInfo::getFieldName).toArray(String[]::new);
+ int[] fieldIndices =
fieldInfos.stream().mapToInt(FieldInfo::getIndex).toArray();
+ DataType[] dataTypes =
fieldInfos.stream().map(FieldInfo::getType).toArray(DataType[]::new);
+
+ return new TypeInfoSchema(fieldNames, fieldIndices, dataTypes,
isRowtimeAttribute);
+ }
+
+ private static void validateNoStarReference(List<FieldInfo> fieldInfos)
{
+ if (fieldInfos.stream().anyMatch(info ->
info.getFieldName().equals("*"))) {
+ throw new ValidationException("Field name can not be
'*'.");
+ }
+ }
+
+ private static <A> List<FieldInfo> extractFieldInformation(
+ TypeInformation<A> inputType,
+ Expression[] exprs) {
+ final List<FieldInfo> fieldInfos;
if (inputType instanceof GenericTypeInfo &&
inputType.getTypeClass() == Row.class) {
- throw new TableException(
+ throw new ValidationException(
"An input of GenericTypeInfo<Row> cannot be
converted to Table. " +
"Please specify the type of the input
with a RowTypeInfo.");
} else if (inputType instanceof TupleTypeInfoBase) {
- fieldInfos =
extractFieldInfosFromTupleType((CompositeType) inputType, exprs);
+ fieldInfos =
extractFieldInfosFromTupleType((TupleTypeInfoBase<?>) inputType, exprs);
} else if (inputType instanceof PojoTypeInfo) {
- fieldInfos =
extractFieldInfosByNameReference((CompositeType) inputType, exprs);
+ fieldInfos =
extractFieldInfosByNameReference((CompositeType<?>) inputType, exprs);
} else {
- fieldInfos = extractFieldInfoFromAtomicType(exprs);
+ fieldInfos = extractFieldInfoFromAtomicType(inputType,
exprs);
}
+ return fieldInfos;
+ }
- if (fieldInfos.stream().anyMatch(info ->
info.getFieldName().equals("*"))) {
- throw new TableException("Field name can not be '*'.");
+ private static void validateAtMostOneProctimeAttribute(List<FieldInfo>
fieldInfos) {
+ List<FieldInfo> proctimeAttributes = fieldInfos.stream()
+ .filter(FieldInfoUtils::isProctimeField)
+ .collect(Collectors.toList());
+
+ if (proctimeAttributes.size() > 1) {
+ throw new ValidationException(
+ "The proctime attribute can only be defined
once in a table schema. Duplicated proctime attributes: " +
+ proctimeAttributes);
}
+ }
- String[] fieldNames =
fieldInfos.stream().map(FieldInfo::getFieldName).toArray(String[]::new);
- int[] fieldIndices =
fieldInfos.stream().mapToInt(FieldInfo::getIndex).toArray();
- return new FieldsInfo(fieldNames, fieldIndices);
+ private static boolean checkIfRowtimeAttribute(List<FieldInfo>
fieldInfos) {
+ List<FieldInfo> rowtimeAttributes = fieldInfos.stream()
+ .filter(FieldInfoUtils::isRowtimeField)
+ .collect(Collectors.toList());
+
+ if (rowtimeAttributes.size() > 1) {
+ throw new ValidationException(
+ "The rowtime attribute can only be defined once
in a table schema. Duplicated rowtime attributes: " +
+ rowtimeAttributes);
+ }
+ return rowtimeAttributes.size() > 0;
}
/**
@@ -180,7 +303,7 @@ public class FieldInfoUtils {
}
if (Arrays.asList(fieldNames).contains("*")) {
- throw new TableException("Field name can not be '*'.");
+ throw new ValidationException("Field name can not be
'*'.");
}
return fieldNames;
@@ -190,14 +313,14 @@ public class FieldInfoUtils {
* Validate if class represented by the typeInfo is static and globally
accessible.
*
* @param typeInfo type to check
- * @throws TableException if type does not meet these criteria
+ * @throws ValidationException if type does not meet these criteria
*/
public static <A> void validateInputTypeInfo(TypeInformation<A>
typeInfo) {
Class<A> clazz = typeInfo.getTypeClass();
if ((clazz.isMemberClass() &&
!Modifier.isStatic(clazz.getModifiers())) ||
!Modifier.isPublic(clazz.getModifiers()) ||
clazz.getCanonicalName() == null) {
- throw new TableException(format(
+ throw new ValidationException(format(
"Class '%s' described in type information '%s'
must be " +
"static and globally accessible.", clazz,
typeInfo));
}
@@ -225,7 +348,7 @@ public class FieldInfoUtils {
final TypeInformation<?>[] fieldTypes;
if (inputType instanceof CompositeType) {
int arity = inputType.getArity();
- CompositeType ct = (CompositeType) inputType;
+ CompositeType ct = (CompositeType<?>) inputType;
fieldTypes = IntStream.range(0,
arity).mapToObj(ct::getTypeAt).toArray(TypeInformation[]::new);
} else {
fieldTypes = new TypeInformation[]{inputType};
@@ -234,144 +357,62 @@ public class FieldInfoUtils {
return fieldTypes;
}
- /**
- * Derives {@link TableSchema} out of a {@link TypeInformation}. It is
complementary to other
- * methods in this class. This also performs translation from time
indicator markers such as
- * {@link TimeIndicatorTypeInfo#ROWTIME_STREAM_MARKER} etc. to a
corresponding
- * {@link TimeIndicatorTypeInfo}.
- *
- * @param typeInfo input type info to calculate fields type infos from
- * @param fieldIndexes indices within the typeInfo of the resulting
Table schema
- * @param fieldNames names of the fields of the resulting schema
- * @return calculates resulting schema
- */
- public static TableSchema calculateTableSchema(
- TypeInformation<?> typeInfo,
- int[] fieldIndexes,
- String[] fieldNames) {
-
- if (fieldIndexes.length != fieldNames.length) {
- throw new TableException(String.format(
- "Number of field names and field indexes must
be equal.\n" +
- "Number of names is %s, number of
indexes is %s.\n" +
- "List of column names: %s.\n" +
- "List of column indexes: %s.",
- fieldNames.length,
- fieldIndexes.length,
- String.join(", ", fieldNames),
-
Arrays.stream(fieldIndexes).mapToObj(Integer::toString).collect(Collectors.joining(",
"))));
- }
-
- // check uniqueness of field names
- Set<String> duplicatedNames = findDuplicates(fieldNames);
- if (duplicatedNames.size() != 0) {
-
- throw new TableException(String.format(
- "Field names must be unique.\n" +
- "List of duplicate fields: [%s].\n" +
- "List of all fields: [%s].",
- String.join(", ", duplicatedNames),
- String.join(", ", fieldNames)));
- }
-
- final TypeInformation[] types;
- long fieldIndicesCount = Arrays.stream(fieldIndexes).filter(i
-> i >= 0).count();
- if (typeInfo instanceof CompositeType) {
- CompositeType ct = (CompositeType) typeInfo;
- // it is ok to leave out fields
- if (fieldIndicesCount > ct.getArity()) {
- throw new TableException(String.format(
- "Arity of type (%s) must not be greater
than number of field names %s.",
- Arrays.toString(ct.getFieldNames()),
- Arrays.toString(fieldNames)));
- }
-
- types = Arrays.stream(fieldIndexes)
- .mapToObj(idx ->
extractTimeMarkerType(idx).orElseGet(() -> ct.getTypeAt(idx)))
- .toArray(TypeInformation[]::new);
- } else {
- if (fieldIndicesCount > 1) {
- throw new TableException(
- "Non-composite input type may have only
a single field and its index must be 0.");
- }
-
- types = Arrays.stream(fieldIndexes)
- .mapToObj(idx ->
extractTimeMarkerType(idx).orElse(typeInfo))
- .toArray(TypeInformation[]::new);
- }
-
- return new TableSchema(fieldNames, types);
- }
-
/* Utility methods */
- private static Optional<TypeInformation<?>> extractTimeMarkerType(int
idx) {
- switch (idx) {
- case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER:
- return
Optional.of(TimeIndicatorTypeInfo.ROWTIME_INDICATOR);
- case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER:
- return
Optional.of(TimeIndicatorTypeInfo.PROCTIME_INDICATOR);
- case TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER:
- case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER:
- return Optional.of(Types.SQL_TIMESTAMP);
- default:
- return Optional.empty();
- }
- }
-
- private static Set<FieldInfo>
extractFieldInfoFromAtomicType(Expression[] exprs) {
- boolean referenced = false;
- FieldInfo fieldInfo = null;
- for (Expression expr : exprs) {
+ private static List<FieldInfo>
extractFieldInfoFromAtomicType(TypeInformation<?> atomicType, Expression[]
exprs) {
+ List<FieldInfo> fields = new ArrayList<>(exprs.length);
+ boolean alreadyReferenced = false;
+ for (int i = 0; i < exprs.length; i++) {
+ Expression expr = exprs[i];
if (expr instanceof UnresolvedReferenceExpression) {
- if (referenced) {
- throw new TableException("Only the
first field can reference an atomic type.");
- } else {
- referenced = true;
- fieldInfo = new
FieldInfo(((UnresolvedReferenceExpression) expr).getName(), 0);
+ if (alreadyReferenced) {
+ throw new ValidationException("Too many
fields referenced from an atomic type.");
}
- } else if (!isTimeAttribute(expr)) { // IGNORE Time
attributes
- throw new TableException("Field reference
expression expected.");
- }
- }
- if (fieldInfo != null) {
- return Collections.singleton(fieldInfo);
+ alreadyReferenced = true;
+ String name = ((UnresolvedReferenceExpression)
expr).getName();
+ fields.add(new FieldInfo(name, i,
fromLegacyInfoToDataType(atomicType)));
+ } else if (isRowTimeExpression(expr)) {
+ UnresolvedReferenceExpression reference =
getChildAsReference(expr);
+ fields.add(createTimeAttributeField(reference,
TimestampKind.ROWTIME, null));
+ } else if (isProcTimeExpression(expr)) {
+ UnresolvedReferenceExpression reference =
getChildAsReference(expr);
+ fields.add(createTimeAttributeField(reference,
TimestampKind.PROCTIME, null));
+ } else {
+ throw new ValidationException("Field reference
expression expected.");
+ }
}
-
- return Collections.emptySet();
- }
-
- private static <A> Set<FieldInfo>
extractFieldInfosByNameReference(CompositeType inputType, Expression[] exprs) {
- ExprToFieldInfo exprToFieldInfo = new
ExprToFieldInfo(inputType);
- return Arrays.stream(exprs)
- .map(expr -> expr.accept(exprToFieldInfo))
- .filter(Optional::isPresent)
- .map(Optional::get)
- .collect(Collectors.toCollection(LinkedHashSet::new));
+ return fields;
}
- private static <A> Set<FieldInfo>
extractFieldInfosFromTupleType(CompositeType inputType, Expression[] exprs) {
- boolean isRefByPos = isReferenceByPosition((CompositeType<?>)
inputType, exprs);
+ private static List<FieldInfo>
extractFieldInfosFromTupleType(TupleTypeInfoBase<?> inputType, Expression[]
exprs) {
+ boolean isRefByPos = isReferenceByPosition(inputType, exprs);
if (isRefByPos) {
return IntStream.range(0, exprs.length)
- .mapToObj(idx -> exprs[idx].accept(new
IndexedExprToFieldInfo(idx)))
- .filter(Optional::isPresent)
- .map(Optional::get)
-
.collect(Collectors.toCollection(LinkedHashSet::new));
+ .mapToObj(idx -> exprs[idx].accept(new
IndexedExprToFieldInfo(inputType, idx)))
+ .collect(Collectors.toList());
} else {
return extractFieldInfosByNameReference(inputType,
exprs);
}
}
+ private static List<FieldInfo>
extractFieldInfosByNameReference(CompositeType<?> inputType, Expression[]
exprs) {
+ ExprToFieldInfo exprToFieldInfo = new
ExprToFieldInfo(inputType);
+ return Arrays.stream(exprs)
+ .map(expr -> expr.accept(exprToFieldInfo))
+ .collect(Collectors.toList());
+ }
+
private static class FieldInfo {
private final String fieldName;
private final int index;
+ private final DataType type;
- FieldInfo(String fieldName, int index) {
+ FieldInfo(String fieldName, int index, DataType type) {
this.fieldName = fieldName;
this.index = index;
+ this.type = type;
}
public String getFieldName() {
@@ -381,51 +422,90 @@ public class FieldInfoUtils {
public int getIndex() {
return index;
}
+
+ public DataType getType() {
+ return type;
+ }
}
- private static class IndexedExprToFieldInfo extends
ApiExpressionDefaultVisitor<Optional<FieldInfo>> {
+ private static class IndexedExprToFieldInfo extends
ApiExpressionDefaultVisitor<FieldInfo> {
+ private final CompositeType<?> inputType;
private final int index;
- private IndexedExprToFieldInfo(int index) {
+ private IndexedExprToFieldInfo(CompositeType<?> inputType, int
index) {
+ this.inputType = inputType;
this.index = index;
}
@Override
- public Optional<FieldInfo>
visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference) {
+ public FieldInfo
visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference) {
String fieldName = unresolvedReference.getName();
- return Optional.of(new FieldInfo(fieldName, index));
+ return new FieldInfo(fieldName, index,
fromLegacyInfoToDataType(getTypeAt(unresolvedReference)));
}
@Override
- public Optional<FieldInfo> visitCall(CallExpression call) {
+ public FieldInfo visitCall(CallExpression call) {
if (call.getFunctionDefinition() ==
BuiltInFunctionDefinitions.AS) {
- List<Expression> children = call.getChildren();
- Expression origExpr = children.get(0);
- String newName =
ExpressionUtils.extractValue(children.get(1), String.class)
- .orElseThrow(() ->
- new TableException("Alias
expects string literal as new name. Got: " + children.get(1)));
-
- if (origExpr instanceof
UnresolvedReferenceExpression) {
- throw new TableException(
- format("Alias '%s' is not
allowed if other fields are referenced by position.", newName));
- } else if (isTimeAttribute(origExpr)) {
- return Optional.empty();
- }
- } else if (isTimeAttribute(call)) {
- return Optional.empty();
+ return visitAlias(call);
+ } else if (isRowTimeExpression(call)) {
+ validateRowtimeReplacesCompatibleType(call);
+ return
createTimeAttributeField(getChildAsReference(call), TimestampKind.ROWTIME,
null);
+ } else if (isProcTimeExpression(call)) {
+ validateProcTimeAttributeAppended(call);
+ return
createTimeAttributeField(getChildAsReference(call), TimestampKind.PROCTIME,
null);
}
return defaultMethod(call);
}
+ private FieldInfo visitAlias(CallExpression call) {
+ List<Expression> children = call.getChildren();
+ String newName = extractAlias(children.get(1));
+
+ Expression child = children.get(0);
+ if (isProcTimeExpression(child)) {
+ validateProcTimeAttributeAppended(call);
+ return
createTimeAttributeField(getChildAsReference(child), TimestampKind.PROCTIME,
newName);
+ } else {
+ throw new ValidationException(
+ format("Alias '%s' is not allowed if
other fields are referenced by position.", newName));
+ }
+ }
+
+ private void
validateRowtimeReplacesCompatibleType(CallExpression call) {
+ if (index < inputType.getArity()) {
+ checkRowtimeType(getTypeAt(call));
+ }
+ }
+
+ private void validateProcTimeAttributeAppended(CallExpression
call) {
+ if (index < inputType.getArity()) {
+ throw new
ValidationException(String.format("The proctime attribute can only be appended
to the" +
+ " table schema and not replace an
existing field. Please move '%s' to the end of the" +
+ " schema.", call));
+ }
+ }
+
+ private TypeInformation<Object> getTypeAt(Expression expr) {
+ if (index >= inputType.getArity()) {
+ throw new ValidationException(String.format(
+ "Number of expressions does not match
number of input fields.\n" +
+ "Available fields: %s\n" +
+ "Could not map: %s",
+
Arrays.toString(inputType.getFieldNames()),
+ expr));
+ }
+ return inputType.getTypeAt(index);
+ }
+
@Override
- protected Optional<FieldInfo> defaultMethod(Expression
expression) {
- throw new TableException("Field reference expression or
alias on field expression expected.");
+ protected FieldInfo defaultMethod(Expression expression) {
+ throw new ValidationException("Field reference
expression or alias on field expression expected.");
}
}
- private static class ExprToFieldInfo extends
ApiExpressionDefaultVisitor<Optional<FieldInfo>> {
+ private static class ExprToFieldInfo extends
ApiExpressionDefaultVisitor<FieldInfo> {
private final CompositeType ct;
@@ -433,53 +513,136 @@ public class FieldInfoUtils {
this.ct = ct;
}
+ private ValidationException fieldNotFound(String name) {
+ return new ValidationException(format(
+ "%s is not a field of type %s. Expected: %s}",
+ name,
+ ct,
+ String.join(", ", ct.getFieldNames())));
+ }
+
@Override
- public Optional<FieldInfo>
visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference) {
- String fieldName = unresolvedReference.getName();
- return referenceByName(fieldName, ct).map(idx -> new
FieldInfo(fieldName, idx));
+ public FieldInfo
visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference) {
+ return createFieldInfo(unresolvedReference, null);
}
@Override
- public Optional<FieldInfo> visitCall(CallExpression call) {
+ public FieldInfo visitCall(CallExpression call) {
if (call.getFunctionDefinition() ==
BuiltInFunctionDefinitions.AS) {
- List<Expression> children = call.getChildren();
- Expression origExpr = children.get(0);
- String newName =
ExpressionUtils.extractValue(children.get(1), String.class)
- .orElseThrow(() ->
- new TableException("Alias
expects string literal as new name. Got: " + children.get(1)));
-
- if (origExpr instanceof
UnresolvedReferenceExpression) {
- return
referenceByName(((UnresolvedReferenceExpression) origExpr).getName(), ct)
- .map(idx -> new
FieldInfo(newName, idx));
- } else if (isTimeAttribute(origExpr)) {
- return Optional.empty();
- }
- } else if (isTimeAttribute(call)) {
- return Optional.empty();
+ return visitAlias(call);
+ } else if (isRowTimeExpression(call)) {
+ return createRowtimeFieldInfo(call, null);
+ } else if (isProcTimeExpression(call)) {
+ return createProctimeFieldInfo(call, null);
}
return defaultMethod(call);
}
+ private FieldInfo visitAlias(CallExpression call) {
+ List<Expression> children = call.getChildren();
+ String newName = extractAlias(children.get(1));
+
+ Expression child = children.get(0);
+ if (child instanceof UnresolvedReferenceExpression) {
+ return
createFieldInfo((UnresolvedReferenceExpression) child, newName);
+ } else if (isRowTimeExpression(child)) {
+ return createRowtimeFieldInfo(child, newName);
+ } else if (isProcTimeExpression(child)) {
+ return createProctimeFieldInfo(child, newName);
+ } else {
+ return defaultMethod(call);
+ }
+ }
+
+ private FieldInfo createFieldInfo(UnresolvedReferenceExpression
unresolvedReference, @Nullable String alias) {
+ String fieldName = unresolvedReference.getName();
+ return referenceByName(fieldName, ct)
+ .map(idx -> new FieldInfo(
+ alias != null ? alias : fieldName,
+ idx,
+
fromLegacyInfoToDataType(ct.getTypeAt(idx))))
+ .orElseThrow(() -> fieldNotFound(fieldName));
+ }
+
+ private FieldInfo createProctimeFieldInfo(Expression
expression, @Nullable String alias) {
+ UnresolvedReferenceExpression reference =
getChildAsReference(expression);
+ String originalName = reference.getName();
+ validateProctimeDoesNotReplaceField(originalName);
+
+ return createTimeAttributeField(reference,
TimestampKind.PROCTIME, alias);
+ }
+
+ private void validateProctimeDoesNotReplaceField(String
originalName) {
+ if (referenceByName(originalName, ct).isPresent()) {
+ throw new ValidationException(String.format(
+ "The proctime attribute '%s' must not
replace an existing field.",
+ originalName));
+ }
+ }
+
+ private FieldInfo createRowtimeFieldInfo(Expression expression,
@Nullable String alias) {
+ UnresolvedReferenceExpression reference =
getChildAsReference(expression);
+ String originalName = reference.getName();
+ verifyReferencesValidField(originalName, alias);
+
+ return createTimeAttributeField(reference,
TimestampKind.ROWTIME, alias);
+ }
+
+ private void verifyReferencesValidField(String origName,
@Nullable String alias) {
+ Optional<Integer> refId = referenceByName(origName, ct);
+ if (refId.isPresent()) {
+ checkRowtimeType(ct.getTypeAt(refId.get()));
+ } else if (alias != null) {
+ throw new
ValidationException(String.format("Alias '%s' must reference an existing
field.", alias));
+ }
+ }
+
@Override
- protected Optional<FieldInfo> defaultMethod(Expression
expression) {
- throw new TableException("Field reference expression or
alias on field expression expected.");
+ protected FieldInfo defaultMethod(Expression expression) {
+ throw new ValidationException("Field reference
expression or alias on field expression expected.");
}
}
- private static boolean isTimeAttribute(Expression origExpr) {
+ private static String extractAlias(Expression aliasExpr) {
+ return ExpressionUtils.extractValue(aliasExpr, String.class)
+ .orElseThrow(() -> new TableException("Alias expects
string literal as new name. Got: " + aliasExpr));
+ }
+
+ private static void checkRowtimeType(TypeInformation<?> type) {
+ if (!(type.equals(Types.LONG()) || type instanceof
SqlTimeTypeInfo)) {
+ throw new ValidationException(
+ "The rowtime attribute can only replace a field
with a valid time type, " +
+ "such as Timestamp or Long. But was: "
+ type);
+ }
+ }
+
+ private static boolean isRowtimeField(FieldInfo field) {
+ DataType type = field.getType();
+ return hasRoot(type.getLogicalType(),
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) &&
+ isRowtimeAttribute(type.getLogicalType());
+ }
+
+ private static boolean isProctimeField(FieldInfo field) {
+ DataType type = field.getType();
+ return hasRoot(type.getLogicalType(),
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) &&
+ isProctimeAttribute(type.getLogicalType());
+ }
+
+ private static boolean isRowTimeExpression(Expression origExpr) {
+ return origExpr instanceof CallExpression &&
+ ((CallExpression) origExpr).getFunctionDefinition() ==
BuiltInFunctionDefinitions.ROWTIME;
+ }
+
+ private static boolean isProcTimeExpression(Expression origExpr) {
return origExpr instanceof CallExpression &&
- TIME_ATTRIBUTES.contains(((CallExpression)
origExpr).getFunctionDefinition());
+ ((CallExpression) origExpr).getFunctionDefinition() ==
BuiltInFunctionDefinitions.PROCTIME;
}
private static Optional<Integer> referenceByName(String name,
CompositeType<?> ct) {
int inputIdx = ct.getFieldIndex(name);
if (inputIdx < 0) {
- throw new TableException(format(
- "%s is not a field of type %s. Expected: %s}",
- name,
- ct,
- String.join(", ", ct.getFieldNames())));
+ return Optional.empty();
} else {
return Optional.of(inputIdx);
}
@@ -500,6 +663,38 @@ public class FieldInfoUtils {
return duplicates;
}
+ private static FieldInfo createTimeAttributeField(
+ UnresolvedReferenceExpression reference,
+ TimestampKind kind,
+ @Nullable String alias) {
+ final int idx;
+ if (kind == TimestampKind.PROCTIME) {
+ idx = TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER;
+ } else {
+ idx = TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER;
+ }
+
+ String originalName = reference.getName();
+ return new FieldInfo(
+ alias != null ? alias : originalName,
+ idx,
+ createTimeIndicatorType(kind));
+ }
+
+ private static UnresolvedReferenceExpression
getChildAsReference(Expression expression) {
+ Expression child = expression.getChildren().get(0);
+ if (child instanceof UnresolvedReferenceExpression) {
+ return (UnresolvedReferenceExpression) child;
+ }
+
+ throw new ValidationException("Field reference expression
expected.");
+ }
+
+ private static DataType createTimeIndicatorType(TimestampKind kind) {
+ return new AtomicDataType(new TimestampType(true, kind, 3))
+ .bridgedTo(java.sql.Timestamp.class);
+ }
+
private FieldInfoUtils() {
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
index 7c7eefe..17745ec 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.catalog.CatalogManager
import org.apache.flink.table.descriptors.{BatchTableDescriptor,
ConnectorDescriptor}
import org.apache.flink.table.explain.PlanJsonParser
import
org.apache.flink.table.expressions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
-import org.apache.flink.table.expressions.{CallExpression, Expression}
+import org.apache.flink.table.expressions.{CallExpression, Expression,
ExpressionDefaultVisitor}
import org.apache.flink.table.operations.DataSetQueryOperation
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
@@ -42,10 +42,11 @@ import org.apache.flink.table.runtime.MapRunner
import org.apache.flink.table.sinks._
import org.apache.flink.table.sources.{BatchTableSource,
InputFormatTableSource, TableSource, TableSourceUtil}
import
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
-import org.apache.flink.table.typeutils.FieldInfoUtils.{calculateTableSchema,
getFieldsInfo, validateInputTypeInfo}
+import org.apache.flink.table.typeutils.FieldInfoUtils.{getFieldsInfo,
validateInputTypeInfo}
import org.apache.flink.table.utils.TableConnectorUtils
import org.apache.flink.types.Row
+import _root_.scala.collection.JavaConverters._
/**
* The abstract base class for the implementation of batch TableEnvironments.
*
@@ -206,23 +207,35 @@ abstract class BatchTableEnvImpl(
val fieldsInfo = fields match {
case Some(f) =>
- if (f.exists(f =>
- f.isInstanceOf[CallExpression] &&
-
TIME_ATTRIBUTES.contains(f.asInstanceOf[CallExpression].getFunctionDefinition)))
{
- throw new ValidationException(
- ".rowtime and .proctime time indicators are not allowed in a batch
environment.")
- }
-
+ checkNoTimeAttributes(f)
getFieldsInfo[T](inputType, f)
+
case None => getFieldsInfo[T](inputType)
}
- val tableOperation = new DataSetQueryOperation[T](dataSet,
+ val tableOperation = new DataSetQueryOperation[T](
+ dataSet,
fieldsInfo.getIndices,
- calculateTableSchema(inputType, fieldsInfo.getIndices,
fieldsInfo.getFieldNames))
+ fieldsInfo.toTableSchema)
tableOperation
}
+ private def checkNoTimeAttributes[T](f: Array[Expression]) = {
+ if (f.exists(f =>
+ f.accept(new ExpressionDefaultVisitor[Boolean] {
+
+ override def visitCall(call: CallExpression): Boolean = {
+ TIME_ATTRIBUTES.contains(call.getFunctionDefinition) ||
+ call.getChildren.asScala.exists(_.accept(this))
+ }
+
+ override protected def defaultMethod(expression: Expression): Boolean
= false
+ }))) {
+ throw new ValidationException(
+ ".rowtime and .proctime time indicators are not allowed in a batch
environment.")
+ }
+ }
+
/**
* Returns the built-in normalization rules that are defined by the
environment.
*/
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
index c351a46..49103f7 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
@@ -28,7 +28,6 @@ import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{RuleSet, RuleSets}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
@@ -52,8 +51,7 @@ import org.apache.flink.table.runtime.{CRowMapRunner,
OutputRowtimeProcessFuncti
import org.apache.flink.table.sinks._
import org.apache.flink.table.sources.{StreamTableSource, TableSource,
TableSourceUtil}
import
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
-import org.apache.flink.table.typeutils.FieldInfoUtils.{calculateTableSchema,
getFieldsInfo, isReferenceByPosition}
-import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils}
+import org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo
import _root_.scala.collection.JavaConverters._
@@ -325,240 +323,32 @@ abstract class StreamTableEnvImpl(
val streamType = dataStream.getType
// get field names and types for all non-replaced fields
- val (indices, names) = fields match {
+ val fieldsInfo = fields match {
case Some(f) =>
// validate and extract time attributes
val fieldsInfo = getFieldsInfo[T](streamType, f)
- val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType,
f)
// check if event-time is enabled
- if (rowtime.isDefined &&
+ if (fieldsInfo.isRowtimeDefined &&
execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime)
{
- throw new TableException(
+ throw new ValidationException(
s"A rowtime attribute requires an EventTime time characteristic in
stream environment" +
s". But is: ${execEnv.getStreamTimeCharacteristic}")
}
- // adjust field indexes and field names
- val indexesWithIndicatorFields = adjustFieldIndexes(
- fieldsInfo.getIndices,
- rowtime,
- proctime)
- val namesWithIndicatorFields = adjustFieldNames(
- fieldsInfo.getFieldNames,
- rowtime,
- proctime)
-
- (indexesWithIndicatorFields, namesWithIndicatorFields)
+ fieldsInfo
case None =>
- val fieldsInfo = getFieldsInfo[T](streamType)
- (fieldsInfo.getIndices, fieldsInfo.getFieldNames)
+ getFieldsInfo[T](streamType)
}
val dataStreamTable = new DataStreamQueryOperation(
dataStream,
- indices,
- calculateTableSchema(streamType, indices, names))
+ fieldsInfo.getIndices,
+ fieldsInfo.toTableSchema)
dataStreamTable
}
/**
- * Checks for at most one rowtime and proctime attribute.
- * Returns the time attributes.
- *
- * @return rowtime attribute and proctime attribute
- */
- private def validateAndExtractTimeAttributes(
- streamType: TypeInformation[_],
- exprs: Array[Expression])
- : (Option[(Int, String)], Option[(Int, String)]) = {
-
- val (isRefByPos, fieldTypes) = streamType match {
- case c: CompositeType[_] =>
- // determine schema definition mode (by position or by name)
- (isReferenceByPosition(c, exprs), (0 until c.getArity).map(i =>
c.getTypeAt(i)).toArray)
- case t: TypeInformation[_] =>
- (false, Array(t))
- }
-
- var fieldNames: List[String] = Nil
- var rowtime: Option[(Int, String)] = None
- var proctime: Option[(Int, String)] = None
-
- def checkRowtimeType(t: TypeInformation[_]): Unit = {
- if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
- throw new TableException(
- s"The rowtime attribute can only replace a field with a valid time
type, " +
- s"such as Timestamp or Long. But was: $t")
- }
- }
-
- def extractRowtime(idx: Int, name: String, origName: Option[String]): Unit
= {
- if (rowtime.isDefined) {
- throw new TableException(
- "The rowtime attribute can only be defined once in a table schema.")
- } else {
- // if the fields are referenced by position,
- // it is possible to replace an existing field or append the time
attribute at the end
- if (isRefByPos) {
- // aliases are not permitted
- if (origName.isDefined) {
- throw new TableException(
- s"Invalid alias '${origName.get}' because fields are referenced
by position.")
- }
- // check type of field that is replaced
- if (idx < fieldTypes.length) {
- checkRowtimeType(fieldTypes(idx))
- }
- }
- // check reference-by-name
- else {
- val aliasOrName = origName.getOrElse(name)
- streamType match {
- // both alias and reference must have a valid type if they replace
a field
- case ct: CompositeType[_] if ct.hasField(aliasOrName) =>
- val t = ct.getTypeAt(ct.getFieldIndex(aliasOrName))
- checkRowtimeType(t)
- // alias could not be found
- case _ if origName.isDefined =>
- throw new TableException(s"Alias '${origName.get}' must
reference an existing field.")
- case _ => // ok
- }
- }
-
- rowtime = Some(idx, name)
- }
- }
-
- def extractProctime(idx: Int, name: String): Unit = {
- if (proctime.isDefined) {
- throw new TableException(
- "The proctime attribute can only be defined once in a table
schema.")
- } else {
- // if the fields are referenced by position,
- // it is only possible to append the time attribute at the end
- if (isRefByPos) {
-
- // check that proctime is only appended
- if (idx < fieldTypes.length) {
- throw new TableException(
- "The proctime attribute can only be appended to the table schema
and not replace " +
- s"an existing field. Please move '$name' to the end of the
schema.")
- }
- }
- // check reference-by-name
- else {
- streamType match {
- // proctime attribute must not replace a field
- case ct: CompositeType[_] if ct.hasField(name) =>
- throw new TableException(
- s"The proctime attribute '$name' must not replace an existing
field.")
- case _ => // ok
- }
- }
- proctime = Some(idx, name)
- }
- }
-
- val bridgedFields = exprs.map(expressionBridge.bridge).toArray[Expression]
- bridgedFields.zipWithIndex.foreach {
- case (RowtimeAttribute(UnresolvedFieldReference(name)), idx) =>
- extractRowtime(idx, name, None)
-
- case (Alias(RowtimeAttribute(UnresolvedFieldReference(origName)), name,
_), idx) =>
- extractRowtime(idx, name, Some(origName))
-
- case (ProctimeAttribute(UnresolvedFieldReference(name)), idx) =>
- extractProctime(idx, name)
-
- case (Alias(ProctimeAttribute(UnresolvedFieldReference(_)), name, _),
idx) =>
- extractProctime(idx, name)
-
- case (UnresolvedFieldReference(name), _) => fieldNames = name ::
fieldNames
-
- case (Alias(UnresolvedFieldReference(_), name, _), _) => fieldNames =
name :: fieldNames
-
- case (e, _) =>
- throw new TableException(s"Time attributes can only be defined on
field references. " +
- s"Rowtime attributes can replace existing fields, proctime
attributes can not. " +
- s"But was: $e")
- }
-
- if (rowtime.isDefined && fieldNames.contains(rowtime.get._2)) {
- throw new TableException(
- "The rowtime attribute may not have the same name as an another
field.")
- }
-
- if (proctime.isDefined && fieldNames.contains(proctime.get._2)) {
- throw new TableException(
- "The proctime attribute may not have the same name as an another
field.")
- }
-
- (rowtime, proctime)
- }
-
- /**
- * Injects markers for time indicator fields into the field indexes.
- *
- * @param fieldIndexes The field indexes into which the time indicators
markers are injected.
- * @param rowtime An optional rowtime indicator
- * @param proctime An optional proctime indicator
- * @return An adjusted array of field indexes.
- */
- private def adjustFieldIndexes(
- fieldIndexes: Array[Int],
- rowtime: Option[(Int, String)],
- proctime: Option[(Int, String)]): Array[Int] = {
-
- // inject rowtime field
- val withRowtime = rowtime match {
- case Some(rt) =>
- fieldIndexes.patch(rt._1,
Seq(TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER), 0)
- case _ =>
- fieldIndexes
- }
-
- // inject proctime field
- val withProctime = proctime match {
- case Some(pt) =>
- withRowtime.patch(pt._1,
Seq(TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER), 0)
- case _ =>
- withRowtime
- }
-
- withProctime
- }
-
- /**
- * Injects names of time indicator fields into the list of field names.
- *
- * @param fieldNames The array of field names into which the time indicator
field names are
- * injected.
- * @param rowtime An optional rowtime indicator
- * @param proctime An optional proctime indicator
- * @return An adjusted array of field names.
- */
- private def adjustFieldNames(
- fieldNames: Array[String],
- rowtime: Option[(Int, String)],
- proctime: Option[(Int, String)]): Array[String] = {
-
- // inject rowtime field
- val withRowtime = rowtime match {
- case Some(rt) => fieldNames.patch(rt._1, Seq(rowtime.get._2), 0)
- case _ => fieldNames
- }
-
- // inject proctime field
- val withProctime = proctime match {
- case Some(pt) => withRowtime.patch(pt._1, Seq(proctime.get._2), 0)
- case _ => withRowtime
- }
-
- withProctime
- }
-
- /**
* Returns the decoration rule set for this environment
* including a custom RuleSet configuration.
*/
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
index ffe40f6..6f35d96 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
@@ -470,7 +470,7 @@ public class JavaTableEnvironmentITCase extends
TableProgramsCollectionTestBase
compareResultAsText(results, expected);
}
- @Test(expected = TableException.class)
+ @Test(expected = ValidationException.class)
public void testGenericRow() throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv =
BatchTableEnvironment.create(env, config());
@@ -484,7 +484,7 @@ public class JavaTableEnvironmentITCase extends
TableProgramsCollectionTestBase
tableEnv.fromDataSet(dataSet);
}
- @Test(expected = TableException.class)
+ @Test(expected = ValidationException.class)
public void testGenericRowWithAlias() throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv =
BatchTableEnvironment.create(env, config());
@@ -498,8 +498,8 @@ public class JavaTableEnvironmentITCase extends
TableProgramsCollectionTestBase
tableEnv.fromDataSet(dataSet, "nullField");
}
- @Test(expected = TableException.class)
- public void testAsWithToManyFields() throws Exception {
+ @Test(expected = ValidationException.class)
+ public void testAsWithTooManyFields() throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv =
BatchTableEnvironment.create(env, config());
@@ -507,7 +507,7 @@ public class JavaTableEnvironmentITCase extends
TableProgramsCollectionTestBase
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env),
"a, b, c, d");
}
- @Test(expected = TableException.class)
+ @Test(expected = ValidationException.class)
public void testAsWithAmbiguousFields() throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv =
BatchTableEnvironment.create(env, config());
@@ -516,7 +516,7 @@ public class JavaTableEnvironmentITCase extends
TableProgramsCollectionTestBase
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env),
"a, b, b");
}
- @Test(expected = TableException.class)
+ @Test(expected = ValidationException.class)
public void testAsWithNonFieldReference1() throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv =
BatchTableEnvironment.create(env, config());
@@ -525,7 +525,7 @@ public class JavaTableEnvironmentITCase extends
TableProgramsCollectionTestBase
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env),
"a + 1, b, c");
}
- @Test(expected = TableException.class)
+ @Test(expected = ValidationException.class)
public void testAsWithNonFieldReference2() throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv =
BatchTableEnvironment.create(env, config());
@@ -534,7 +534,7 @@ public class JavaTableEnvironmentITCase extends
TableProgramsCollectionTestBase
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env),
"a as foo, b, c");
}
- @Test(expected = TableException.class)
+ @Test(expected = ValidationException.class)
public void testNonStaticClassInput() throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv =
BatchTableEnvironment.create(env, config());
@@ -543,7 +543,7 @@ public class JavaTableEnvironmentITCase extends
TableProgramsCollectionTestBase
tableEnv.fromDataSet(env.fromElements(new MyNonStatic()),
"name");
}
- @Test(expected = TableException.class)
+ @Test(expected = ValidationException.class)
public void testNonStaticClassOutput() throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv =
BatchTableEnvironment.create(env, config());
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala
index 9783d1a..5ee31da 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/CalcValidationTest.scala
@@ -19,8 +19,8 @@
package org.apache.flink.table.api.batch.table.validation
import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.types.Row
import org.junit.Assert._
@@ -98,7 +98,7 @@ class CalcValidationTest extends TableTestBase {
util.addTable[(Int, Long, String)]("Table1", '*, 'b, 'c)
fail("TableException expected")
} catch {
- case _: TableException => //ignore
+ case _: ValidationException => //ignore
}
try {
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
index a6be848..f9dccf1 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
@@ -34,7 +34,7 @@ class StreamTableEnvironmentValidationTest extends
TableTestBase {
// schema definition by position
//
----------------------------------------------------------------------------------------------
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testInvalidRowtimeAliasByPosition(): Unit = {
val util = streamTestUtil()
// don't allow aliasing by position
@@ -62,69 +62,69 @@ class StreamTableEnvironmentValidationTest extends
TableTestBase {
util.addTable[(Long, Int, String, Int, Long)]('a.rowtime.rowtime, 'b, 'c,
'd, 'e)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testInvalidProctimeAttributeByPosition(): Unit = {
val util = streamTestUtil()
// cannot replace an attribute with proctime
util.addTable[(Long, Int, String, Int, Long)]('a, 'b.proctime, 'c, 'd, 'e)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testRowtimeAttributeReplaceFieldOfInvalidTypeByPosition(): Unit = {
val util = streamTestUtil()
// cannot replace a non-time attribute with rowtime
util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c.rowtime, 'd, 'e)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testRowtimeAndInvalidProctimeAttributeByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd,
'pt.proctime)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testOnlyOneRowtimeAttribute1ByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Long, Int, String, Int, Long)]('a.rowtime, 'b, 'c, 'd, 'e,
'rt.rowtime)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testOnlyOneProctimeAttribute1ByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e,
'pt1.proctime, 'pt2.proctime)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testRowtimeAttributeUsedNameByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e,
'a.rowtime)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testProctimeAttributeUsedNameByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e,
'b.proctime)
}
- @Test(expected = classOf[TableException])
- def testAsWithToManyFieldsByPosition(): Unit = {
+ @Test(expected = classOf[ValidationException])
+ def testAsWithTooManyFieldsByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Int, Long, String)]('a, 'b, 'c, 'd)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testAsWithAmbiguousFieldsByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Int, Long, String)]('a, 'b, 'b)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testOnlyFieldRefInAsByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Int, Long, String)]('a, 'b as 'c, 'd)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testInvalidTimeCharacteristicByPosition(): Unit = {
val data = List((1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"))
val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -139,21 +139,21 @@ class StreamTableEnvironmentValidationTest extends
TableTestBase {
// schema definition by name
//
----------------------------------------------------------------------------------------------
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testInvalidAliasByName(): Unit = {
val util = streamTestUtil()
// we reference by name, but the field does not exist
util.addTable[(Long, Int, String, Int, Long)]('x as 'r)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testInvalidFieldByName(): Unit = {
val util = streamTestUtil()
// we reference by name, but the field does not exist
util.addTable[(Long, Int, String, Int, Long)]('x as 'r)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testInvalidField2ByName(): Unit = {
val util = streamTestUtil()
// we mix reference by position and by name
@@ -167,21 +167,21 @@ class StreamTableEnvironmentValidationTest extends
TableTestBase {
util.addTable[(Int, Long, String)]('_1, ('_2 as 'new).proctime, '_3)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testInvalidReplacingProctimeAttribute(): Unit = {
val util = streamTestUtil()
// proctime must not replace an existing field
util.addTable[(Int, Long, String)]('_1, '_2.proctime, '_3)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testInvalidAliasWithRowtimeAttribute(): Unit = {
val util = streamTestUtil()
// aliased field does not exist
util.addTable[(Int, Long, String)]('_1, 'newnew.rowtime as 'new, '_3)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testInvalidAliasWithRowtimeAttribute2(): Unit = {
val util = streamTestUtil()
// aliased field has wrong type
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala
index 92cae5c..a28c0f6 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.api.validation
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.scala._
import org.apache.flink.table.utils.TableTestBase
import org.junit.Test
@@ -28,7 +28,7 @@ class InlineTableValidationTest extends TableTestBase {
@Test
def testFieldNamesDuplicate() {
- thrown.expect(classOf[TableException])
+ thrown.expect(classOf[ValidationException])
thrown.expectMessage("Field names must be unique.\n" +
"List of duplicate fields: [a].\n" +
"List of all fields: [a, a, b].")
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
index 34c9544..1b2c2c6 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.table.api.TableEnvironmentTest.{CClass, PojoClass}
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException
import org.apache.flink.table.runtime.types.CRowTypeInfo
import org.apache.flink.table.utils.TableTestBase
@@ -55,56 +55,56 @@ class TableEnvironmentValidationTest extends TableTestBase {
val genericRowType = new GenericTypeInfo[Row](classOf[Row])
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testInvalidAliasInRefByPosMode(): Unit = {
val util = batchTestUtil()
// all references must happen position-based
util.addTable('a, 'b, 'f2 as 'c)(tupleType)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testInvalidAliasOnAtomicType(): Unit = {
val util = batchTestUtil()
// alias not allowed
util.addTable('g as 'c)(atomicType)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testGetFieldInfoPojoNames1(): Unit = {
val util = batchTestUtil()
// duplicate name
util.addTable('name1, 'name1, 'name3)(pojoType)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testGetFieldInfoAtomicName2(): Unit = {
val util = batchTestUtil()
// must be only one name
util.addTable('name1, 'name2)(atomicType)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testGetFieldInfoTupleAlias3(): Unit = {
val util = batchTestUtil()
// fields do not exist
util.addTable('xxx as 'name1, 'yyy as 'name2, 'zzz as 'name3)(tupleType)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testGetFieldInfoCClassAlias3(): Unit = {
val util = batchTestUtil()
// fields do not exist
util.addTable('xxx as 'name1, 'yyy as 'name2, 'zzz as
'name3)(caseClassType)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testGetFieldInfoPojoAlias3(): Unit = {
val util = batchTestUtil()
// fields do not exist
util.addTable('xxx as 'name1, 'yyy as 'name2, 'zzz as 'name3)(pojoType)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testGetFieldInfoGenericRowAlias(): Unit = {
val util = batchTestUtil()
// unsupported generic row type
@@ -154,8 +154,8 @@ class TableEnvironmentValidationTest extends TableTestBase {
tEnv2.registerTable("MyTable", t1)
}
- @Test(expected = classOf[TableException])
- def testToTableWithToManyFields(): Unit = {
+ @Test(expected = classOf[ValidationException])
+ def testToTableWithTooManyFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)
@@ -164,7 +164,7 @@ class TableEnvironmentValidationTest extends TableTestBase {
.toTable(tEnv, 'a, 'b, 'c, 'd)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testToTableWithAmbiguousFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)
@@ -174,7 +174,7 @@ class TableEnvironmentValidationTest extends TableTestBase {
.toTable(tEnv, 'a, 'b, 'b)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testToTableWithNonFieldReference1(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)
@@ -184,7 +184,7 @@ class TableEnvironmentValidationTest extends TableTestBase {
.toTable(tEnv, 'a + 1, 'b, 'c)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testToTableWithNonFieldReference2(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)
@@ -194,7 +194,7 @@ class TableEnvironmentValidationTest extends TableTestBase {
.toTable(tEnv, 'a as 'foo, 'b, 'c)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testGenericRow() {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)
@@ -208,7 +208,7 @@ class TableEnvironmentValidationTest extends TableTestBase {
tableEnv.fromDataSet(dataSet)
}
- @Test(expected = classOf[TableException])
+ @Test(expected = classOf[ValidationException])
def testGenericRowWithAlias() {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)