This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 514b51de832fec320caf9cc73eb39d35910a1c79 Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Wed Jul 17 20:23:37 2019 +0800 [FLINK-13287][table-api] Port ExistingField to api-java and use new Expression in FieldComputer --- .../table/sources/tsextractors/ExistingField.java | 136 +++++++++++++++++++++ .../table/expressions/ResolvedFieldReference.java | 29 ++++- .../expressions/ExestingFieldFieldReference.scala | 26 ---- .../table/expressions/PlannerExpressionUtils.scala | 6 +- .../org/apache/flink/table/expressions/call.scala | 4 +- .../apache/flink/table/expressions/composite.scala | 2 +- .../flink/table/expressions/fieldExpression.scala | 2 +- .../flink/table/sources/TableSourceUtil.scala | 13 +- .../table/sources/tsextractors/ExistingField.scala | 111 ----------------- .../ExtendedAggregateExtractProjectRule.java | 4 +- .../table/expressions/PlannerExpressionUtils.scala | 6 +- .../org/apache/flink/table/expressions/call.scala | 4 +- .../apache/flink/table/expressions/composite.scala | 2 +- .../flink/table/expressions/fieldExpression.scala | 2 +- .../DataStreamGroupWindowAggregateBase.scala | 4 +- .../flink/table/sources/TableSourceUtil.scala | 29 +++-- .../table/sources/tsextractors/ExistingField.scala | 88 ------------- .../flink/table/descriptors/RowtimeTest.scala | 19 ++- .../table/utils/TestFilterableTableSource.scala | 22 ++-- 19 files changed, 230 insertions(+), 279 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java new file mode 100644 index 0000000..0e8d73f --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources.tsextractors; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.descriptors.Rowtime; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedFieldReference; +import org.apache.flink.table.types.DataType; + +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.table.api.DataTypes.TIMESTAMP; +import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.typeLiteral; +import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST; +import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Converts an existing {@link Long}, {@link java.sql.Timestamp}, or + * timestamp formatted java.lang.String field (e.g., "2018-05-28 12:34:56.000") into + * a rowtime attribute. + */ +@PublicEvolving +public final class ExistingField extends TimestampExtractor { + + private static final long serialVersionUID = 1L; + + private String field; + + /** + * @param field The field to convert into a rowtime attribute. + */ + public ExistingField(String field) { + this.field = checkNotNull(field); + } + + @Override + public String[] getArgumentFields() { + return new String[] {field}; + } + + @Override + public void validateArgumentFields(TypeInformation<?>[] argumentFieldTypes) { + DataType fieldType = fromLegacyInfoToDataType(argumentFieldTypes[0]); + + switch (fieldType.getLogicalType().getTypeRoot()) { + case BIGINT: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case VARCHAR: + break; + default: + throw new ValidationException(String.format( + "Field '%s' must be of type Long or Timestamp or String but is of type %s.", + field, fieldType)); + } + } + + /** + * Returns an {@link Expression} that casts a {@link Long}, {@link Timestamp}, or + * timestamp formatted {@link String} field (e.g., "2018-05-28 12:34:56.000") + * into a rowtime attribute. + */ + @Override + public Expression getExpression(ResolvedFieldReference[] fieldAccesses) { + ResolvedFieldReference fieldAccess = fieldAccesses[0]; + DataType type = fromLegacyInfoToDataType(fieldAccess.resultType()); + + FieldReferenceExpression fieldReferenceExpr = new FieldReferenceExpression( + fieldAccess.name(), + type, + 0, + fieldAccess.fieldIndex()); + + switch (type.getLogicalType().getTypeRoot()) { + case BIGINT: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return fieldReferenceExpr; + case VARCHAR: + return unresolvedCall( + CAST, + fieldReferenceExpr, + typeLiteral(TIMESTAMP(3).bridgedTo(Timestamp.class))); + default: + throw new RuntimeException("Unsupport type: " + type); + } + } + + @Override + public Map<String, String> toProperties() { + Map<String, String> map = new HashMap<>(); + map.put(Rowtime.ROWTIME_TIMESTAMPS_TYPE, Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD); + map.put(Rowtime.ROWTIME_TIMESTAMPS_FROM, field); + return map; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ExistingField that = (ExistingField) o; + return field.equals(that.field); + } + + @Override + public int hashCode() { + return field.hashCode(); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ResolvedFieldReference.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ResolvedFieldReference.java index 515dcd0..8be0679 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ResolvedFieldReference.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ResolvedFieldReference.java @@ -21,18 +21,37 @@ package org.apache.flink.table.expressions; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.sources.FieldComputer; +import org.apache.flink.util.Preconditions; /** * A reference to a field in an input which has been resolved. * * <p>Note: This interface is added as a temporary solution. It is used to keep api compatible - * for {@link FieldComputer}. In the long term, this interface can be removed when we unify - * the {@link Expression} and {@code PlannerExpression}. + * for {@link FieldComputer}. In the long term, this interface can be removed. */ @PublicEvolving -public interface ResolvedFieldReference { +public class ResolvedFieldReference { - TypeInformation<?> resultType(); + private final String name; + private final TypeInformation<?> resultType; + private final int fieldIndex; - String name(); + public ResolvedFieldReference(String name, TypeInformation<?> resultType, int fieldIndex) { + Preconditions.checkArgument(fieldIndex >= 0, "Index of field should be a positive number"); + this.name = Preconditions.checkNotNull(name, "Field name must not be null."); + this.resultType = Preconditions.checkNotNull(resultType, "Field result type must not be null."); + this.fieldIndex = fieldIndex; + } + + public TypeInformation<?> resultType() { + return resultType; + } + + public String name() { + return name; + } + + public int fieldIndex() { + return fieldIndex; + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala deleted file mode 100644 index 0ad1f5e..0000000 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.expressions - -import org.apache.flink.api.common.typeinfo.TypeInformation - -case class ExestingFieldFieldReference( - name: String, - resultType: TypeInformation[_], - fieldIndex: Int) extends ResolvedFieldReference diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala index 7f7397f..fff6b59 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala @@ -36,20 +36,20 @@ object PlannerExpressionUtils { } private[flink] def isTimeAttribute(expr: PlannerExpression): Boolean = expr match { - case r: ResolvedFieldReference if FlinkTypeFactory.isTimeIndicatorType(r.resultType) => + case r: PlannerResolvedFieldReference if FlinkTypeFactory.isTimeIndicatorType(r.resultType) => true case _ => false } private[flink] def isRowtimeAttribute(expr: PlannerExpression): Boolean = expr match { - case r: ResolvedFieldReference + case r: PlannerResolvedFieldReference if FlinkTypeFactory.isRowtimeIndicatorType(r.resultType) => true case _ => false } private[flink] def isProctimeAttribute(expr: PlannerExpression): Boolean = expr match { - case r: ResolvedFieldReference + case r: PlannerResolvedFieldReference if FlinkTypeFactory.isProctimeIndicatorType(r.resultType) => true case _ => false diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala index 406571f..46059b1 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala @@ -84,9 +84,9 @@ case class OverCall( // check partitionBy expression keys are resolved field reference partitionBy.foreach { - case r: ResolvedFieldReference if r.resultType.isKeyType => + case r: PlannerResolvedFieldReference if r.resultType.isKeyType => ValidationSuccess - case r: ResolvedFieldReference => + case r: PlannerResolvedFieldReference => return ValidationFailure(s"Invalid PartitionBy expression: $r. " + s"Expression must return key type.") case r => diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala index 2b49cd2..64a2f63 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala @@ -92,7 +92,7 @@ case class GetCompositeField(child: PlannerExpression, key: Any) extends UnaryEx } else { None } - case c: ResolvedFieldReference => + case c: PlannerResolvedFieldReference => val keySuffix = if (key.isInstanceOf[Int]) s"_$key" else key Some(s"${c.name}$$$keySuffix") case _ => None diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala index bd27739..6d19c8f 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala @@ -67,7 +67,7 @@ case class UnresolvedFieldReference(name: String) extends Attribute { case class PlannerResolvedFieldReference( name: String, - resultType: TypeInformation[_]) extends Attribute with ResolvedFieldReference { + resultType: TypeInformation[_]) extends Attribute { override def toString = s"'$name" diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala index 59a8e03..7a750ee 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala @@ -22,8 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.table.api.{DataTypes, ValidationException} import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall} -import org.apache.flink.table.expressions.{ExestingFieldFieldReference, ResolvedFieldReference, RexNodeConverter} +import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall, valueLiteral} +import org.apache.flink.table.expressions.{ResolvedFieldReference, RexNodeConverter} import org.apache.flink.table.functions.BuiltInFunctionDefinitions import org.apache.flink.table.types.LogicalTypeDataTypeConverter import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType @@ -273,17 +273,20 @@ object TableSourceUtil { // push an empty values node with the physical schema on the relbuilder relBuilder.push(createSchemaRelNode(resolvedFields)) // get extraction expression - resolvedFields.map(f => ExestingFieldFieldReference(f._1, f._3, f._2)) + resolvedFields.map(f => new ResolvedFieldReference(f._1, f._3, f._2)) } else { new Array[ResolvedFieldReference](0) } val expression = tsExtractor.getExpression(fieldAccesses) // add cast to requested type and convert expression to RexNode + // blink runner treats numeric types as seconds in the cast of timestamp and numerical types. + // So we use REINTERPRET_CAST to keep the mills of numeric types. val castExpression = unresolvedCall( - BuiltInFunctionDefinitions.CAST, + BuiltInFunctionDefinitions.REINTERPRET_CAST, expression, - typeLiteral(DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp]))) + typeLiteral(DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp])), + valueLiteral(false)) val rexExpression = castExpression.accept(new RexNodeConverter(relBuilder)) relBuilder.clear() rexExpression diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala deleted file mode 100644 index 6b39883..0000000 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sources.tsextractors - -import org.apache.flink.api.common.typeinfo.{LocalTimeTypeInfo, TypeInformation, Types} -import org.apache.flink.table.api.ValidationException -import org.apache.flink.table.descriptors.Rowtime -import org.apache.flink.table.expressions._ -import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall, valueLiteral} -import org.apache.flink.table.functions.BuiltInFunctionDefinitions -import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType - -import java.util - -/** - * Converts an existing [[Long]], [[java.sql.Timestamp]], or - * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 12:34:56.000") into - * a rowtime attribute. - * - * @param field The field to convert into a rowtime attribute. - */ -final class ExistingField(val field: String) extends TimestampExtractor { - - override def getArgumentFields: Array[String] = Array(field) - - @throws[ValidationException] - override def validateArgumentFields(argumentFieldTypes: Array[TypeInformation[_]]): Unit = { - val fieldType = argumentFieldTypes(0) - - fieldType match { - case Types.LONG => // OK - case Types.SQL_TIMESTAMP => // OK - case Types.LOCAL_DATE_TIME => // OK - case Types.STRING => // OK - case _: TypeInformation[_] => - throw new ValidationException( - s"Field '$field' must be of type Long or Timestamp or String but is of type $fieldType.") - } - } - - /** - * Returns an [[Expression]] that casts a [[Long]], [[java.sql.Timestamp]], or - * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 12:34:56.000") - * into a rowtime attribute. - */ - override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = { - val fieldAccess: ExestingFieldFieldReference = fieldAccesses(0) - .asInstanceOf[ExestingFieldFieldReference] - - val fieldReferenceExpr = new FieldReferenceExpression( - fieldAccess.name, - fromLegacyInfoToDataType(fieldAccess.resultType), - 0, - fieldAccess.fieldIndex) - - fieldAccess.resultType match { - case Types.LONG => - // access LONG field - val innerDiv = unresolvedCall( - BuiltInFunctionDefinitions.DIVIDE, - fieldReferenceExpr, - valueLiteral(new java.math.BigDecimal(1000))) - - unresolvedCall( - BuiltInFunctionDefinitions.CAST, - innerDiv, - typeLiteral(fromLegacyInfoToDataType(Types.SQL_TIMESTAMP))) - - case Types.SQL_TIMESTAMP | LocalTimeTypeInfo.LOCAL_DATE_TIME => - fieldReferenceExpr - - case Types.STRING => - unresolvedCall( - BuiltInFunctionDefinitions.CAST, - fieldReferenceExpr, - typeLiteral(fromLegacyInfoToDataType(Types.SQL_TIMESTAMP))) - } - } - - override def equals(other: Any): Boolean = other match { - case that: ExistingField => field == that.field - case _ => false - } - - override def hashCode(): Int = { - field.hashCode - } - - override def toProperties: util.Map[String, String] = { - val javaMap = new util.HashMap[String, String]() - javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_TYPE, Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) - javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_FROM, field) - javaMap - } -} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java index d863297..80c297e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java @@ -18,7 +18,7 @@ package org.apache.flink.table.plan.rules.logical; -import org.apache.flink.table.expressions.ResolvedFieldReference; +import org.apache.flink.table.expressions.PlannerResolvedFieldReference; import org.apache.flink.table.plan.logical.LogicalWindow; import org.apache.flink.table.plan.logical.rel.LogicalTableAggregate; import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate; @@ -207,7 +207,7 @@ public class ExtendedAggregateExtractProjectRule extends AggregateExtractProject } private int getWindowTimeFieldIndex(LogicalWindow logicalWindow, RelNode input) { - ResolvedFieldReference timeAttribute = (ResolvedFieldReference) logicalWindow.timeAttribute(); + PlannerResolvedFieldReference timeAttribute = (PlannerResolvedFieldReference) logicalWindow.timeAttribute(); return input.getRowType().getFieldNames().indexOf(timeAttribute.name()); } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala index 7f7397f..fff6b59 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala @@ -36,20 +36,20 @@ object PlannerExpressionUtils { } private[flink] def isTimeAttribute(expr: PlannerExpression): Boolean = expr match { - case r: ResolvedFieldReference if FlinkTypeFactory.isTimeIndicatorType(r.resultType) => + case r: PlannerResolvedFieldReference if FlinkTypeFactory.isTimeIndicatorType(r.resultType) => true case _ => false } private[flink] def isRowtimeAttribute(expr: PlannerExpression): Boolean = expr match { - case r: ResolvedFieldReference + case r: PlannerResolvedFieldReference if FlinkTypeFactory.isRowtimeIndicatorType(r.resultType) => true case _ => false } private[flink] def isProctimeAttribute(expr: PlannerExpression): Boolean = expr match { - case r: ResolvedFieldReference + case r: PlannerResolvedFieldReference if FlinkTypeFactory.isProctimeIndicatorType(r.resultType) => true case _ => false diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala index 508a7f2..f2da6fc 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala @@ -172,9 +172,9 @@ case class OverCall( // check partitionBy expression keys are resolved field reference partitionBy.foreach { - case r: ResolvedFieldReference if r.resultType.isKeyType => + case r: PlannerResolvedFieldReference if r.resultType.isKeyType => ValidationSuccess - case r: ResolvedFieldReference => + case r: PlannerResolvedFieldReference => return ValidationFailure(s"Invalid PartitionBy expression: $r. " + s"Expression must return key type.") case r => diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/composite.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/composite.scala index 1f858a1..3e2c374 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/composite.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/composite.scala @@ -100,7 +100,7 @@ case class GetCompositeField(child: PlannerExpression, key: Any) extends UnaryEx } else { None } - case c: ResolvedFieldReference => + case c: PlannerResolvedFieldReference => val keySuffix = if (key.isInstanceOf[Int]) s"_$key" else key Some(s"${c.name}$$$keySuffix") case _ => None diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala index 56d4e72..ced9b32 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala @@ -71,7 +71,7 @@ case class UnresolvedFieldReference(name: String) extends Attribute { case class PlannerResolvedFieldReference( name: String, - resultType: TypeInformation[_]) extends Attribute with ResolvedFieldReference { + resultType: TypeInformation[_]) extends Attribute { override def toString = s"'$name" diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregateBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregateBase.scala index 2b1c1fb..e381464 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregateBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregateBase.scala @@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWin import org.apache.flink.table.api.{StreamQueryConfig, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.expressions.PlannerExpressionUtils._ -import org.apache.flink.table.expressions.ResolvedFieldReference +import org.apache.flink.table.expressions.PlannerResolvedFieldReference import org.apache.flink.table.plan.logical._ import org.apache.flink.table.plan.nodes.CommonAggregate import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregateBase._ @@ -135,7 +135,7 @@ abstract class DataStreamGroupWindowAggregateBase( val timestampedInput = if (isRowtimeAttribute(window.timeAttribute)) { // copy the window rowtime attribute into the StreamRecord timestamp field - val timeAttribute = window.timeAttribute.asInstanceOf[ResolvedFieldReference].name + val timeAttribute = window.timeAttribute.asInstanceOf[PlannerResolvedFieldReference].name val timeIdx = inputSchema.fieldNames.indexOf(timeAttribute) if (timeIdx < 0) { throw new TableException("Time attribute could not be found. This is a bug.") diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala index ccc4733..f91f2fa 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.sources import java.sql.Timestamp - import com.google.common.collect.ImmutableList import org.apache.calcite.plan.RelOptCluster import org.apache.calcite.rel.RelNode @@ -29,10 +28,12 @@ import org.apache.calcite.rex.{RexLiteral, RexNode} import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.table.api.{TableException, Types, ValidationException} +import org.apache.flink.table.api.{DataTypes, TableException, Types, ValidationException} import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.expressions.{Cast, PlannerExpression, PlannerResolvedFieldReference, ResolvedFieldReference} -import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo +import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall} +import org.apache.flink.table.expressions.{PlannerExpressionConverter, ResolvedFieldReference} +import org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST +import org.apache.flink.table.types.utils.TypeConversions.{fromDataTypeToLegacyInfo, fromLegacyInfoToDataType} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import scala.collection.JavaConverters._ @@ -266,17 +267,23 @@ object TableSourceUtil { // push an empty values node with the physical schema on the relbuilder relBuilder.push(createSchemaRelNode(resolvedFields)) // get extraction expression - resolvedFields.map(f => PlannerResolvedFieldReference(f._1, f._3)) + resolvedFields.map(f => new ResolvedFieldReference(f._1, f._3, f._2)) } else { - new Array[PlannerResolvedFieldReference](0) + new Array[ResolvedFieldReference](0) } - val expression = tsExtractor - .getExpression(fieldAccesses.map(_.asInstanceOf[ResolvedFieldReference])) + val expression = tsExtractor.getExpression(fieldAccesses) // add cast to requested type and convert expression to RexNode - // TODO we cast to planner expressions as a temporary solution to keep the old interfaces - val rexExpression = Cast(expression.asInstanceOf[PlannerExpression], resultType) - .toRexNode(relBuilder) + // If resultType is TimeIndicatorTypeInfo, its internal format is long, but cast + // from Timestamp is java.sql.Timestamp. So we need cast to long first. + val castExpression = unresolvedCall(CAST, + unresolvedCall(CAST, expression, typeLiteral(DataTypes.BIGINT())), + typeLiteral(fromLegacyInfoToDataType(resultType))) + + // TODO we convert to planner expressions as a temporary solution + val rexExpression = castExpression + .accept(PlannerExpressionConverter.INSTANCE) + .toRexNode(relBuilder) relBuilder.clear() rexExpression } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala deleted file mode 100644 index c9f4477..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sources.tsextractors - -import java.util - -import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation, Types} -import org.apache.flink.table.api.ValidationException -import org.apache.flink.table.descriptors.Rowtime -import org.apache.flink.table.expressions._ - -/** - * Converts an existing [[Long]], [[java.sql.Timestamp]], or - * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 12:34:56.000") into - * a rowtime attribute. - * - * @param field The field to convert into a rowtime attribute. - */ -final class ExistingField(val field: String) extends TimestampExtractor { - - override def getArgumentFields: Array[String] = Array(field) - - @throws[ValidationException] - override def validateArgumentFields(argumentFieldTypes: Array[TypeInformation[_]]): Unit = { - val fieldType = argumentFieldTypes(0) - - fieldType match { - case Types.LONG => // OK - case Types.SQL_TIMESTAMP => // OK - case Types.STRING => // OK - case _: TypeInformation[_] => - throw new ValidationException( - s"Field '$field' must be of type Long or Timestamp or String but is of type $fieldType.") - } - } - - /** - * Returns an [[Expression]] that casts a [[Long]], [[java.sql.Timestamp]], or - * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 12:34:56.000") - * into a rowtime attribute. - */ - override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): PlannerExpression = { - val fieldAccess: PlannerExpression = fieldAccesses(0).asInstanceOf[PlannerExpression] - - fieldAccess.resultType match { - case Types.LONG => - // access LONG field - fieldAccess - case Types.SQL_TIMESTAMP => - // cast timestamp to long - Cast(fieldAccess, Types.LONG) - case Types.STRING => - Cast(Cast(fieldAccess, SqlTimeTypeInfo.TIMESTAMP), Types.LONG) - } - } - - override def equals(other: Any): Boolean = other match { - case that: ExistingField => field == that.field - case _ => false - } - - override def hashCode(): Int = { - field.hashCode - } - - override def toProperties: util.Map[String, String] = { - val javaMap = new util.HashMap[String, String]() - javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_TYPE, Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) - javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_FROM, field) - javaMap - } -} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala index 3424088..0f51c76 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala @@ -19,15 +19,18 @@ package org.apache.flink.table.descriptors import java.util - import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} import org.apache.flink.streaming.api.watermark.Watermark -import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.{DataTypes, ValidationException} import org.apache.flink.table.descriptors.RowtimeTest.{CustomAssigner, CustomExtractor} import org.apache.flink.table.expressions._ +import org.apache.flink.table.expressions.utils.ApiExpressionUtils +import org.apache.flink.table.functions.BuiltInFunctionDefinitions import org.apache.flink.table.sources.tsextractors.TimestampExtractor import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner +import org.apache.flink.table.types.utils.TypeConversions import org.apache.flink.types.Row + import org.junit.Test import scala.collection.JavaConverters._ @@ -130,9 +133,17 @@ object RowtimeTest { } override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = { - val fieldAccess: PlannerExpression = fieldAccesses(0).asInstanceOf[PlannerExpression] + val fieldAccess = fieldAccesses(0) require(fieldAccess.resultType == Types.SQL_TIMESTAMP) - Cast(fieldAccess, Types.LONG) + val fieldReferenceExpr = new FieldReferenceExpression( + fieldAccess.name, + TypeConversions.fromLegacyInfoToDataType(fieldAccess.resultType), + 0, + fieldAccess.fieldIndex) + ApiExpressionUtils.unresolvedCall( + BuiltInFunctionDefinitions.CAST, + fieldReferenceExpr, + ApiExpressionUtils.typeLiteral(DataTypes.BIGINT())) } override def equals(other: Any): Boolean = other match { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala index 1372da5..4f767f6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala @@ -161,11 +161,11 @@ class TestFilterableTableSource( private def shouldPushDown(expr: BinaryComparison): Boolean = { (expr.left, expr.right) match { - case (f: ResolvedFieldReference, v: Literal) => + case (f: PlannerResolvedFieldReference, v: Literal) => filterableFields.contains(f.name) - case (v: Literal, f: ResolvedFieldReference) => + case (v: Literal, f: PlannerResolvedFieldReference) => filterableFields.contains(f.name) - case (f1: ResolvedFieldReference, f2: ResolvedFieldReference) => + case (f1: PlannerResolvedFieldReference, f2: PlannerResolvedFieldReference) => filterableFields.contains(f1.name) && filterableFields.contains(f2.name) case (_, _) => false } @@ -184,15 +184,15 @@ class TestFilterableTableSource( expr match { case _: GreaterThan => lhsValue.compareTo(rhsValue) > 0 - case LessThan(l: ResolvedFieldReference, r: Literal) => + case LessThan(l: PlannerResolvedFieldReference, r: Literal) => lhsValue.compareTo(rhsValue) < 0 - case GreaterThanOrEqual(l: ResolvedFieldReference, r: Literal) => + case GreaterThanOrEqual(l: PlannerResolvedFieldReference, r: Literal) => lhsValue.compareTo(rhsValue) >= 0 - case LessThanOrEqual(l: ResolvedFieldReference, r: Literal) => + case LessThanOrEqual(l: PlannerResolvedFieldReference, r: Literal) => lhsValue.compareTo(rhsValue) <= 0 - case EqualTo(l: ResolvedFieldReference, r: Literal) => + case EqualTo(l: PlannerResolvedFieldReference, r: Literal) => lhsValue.compareTo(rhsValue) == 0 - case NotEqualTo(l: ResolvedFieldReference, r: Literal) => + case NotEqualTo(l: PlannerResolvedFieldReference, r: Literal) => lhsValue.compareTo(rhsValue) != 0 } } @@ -201,12 +201,12 @@ class TestFilterableTableSource( : (Comparable[Any], Comparable[Any]) = { (expr.left, expr.right) match { - case (l: ResolvedFieldReference, r: Literal) => + case (l: PlannerResolvedFieldReference, r: Literal) => val idx = rowTypeInfo.getFieldIndex(l.name) val lv = row.getField(idx).asInstanceOf[Comparable[Any]] val rv = r.value.asInstanceOf[Comparable[Any]] (lv, rv) - case (l: Literal, r: ResolvedFieldReference) => + case (l: Literal, r: PlannerResolvedFieldReference) => val idx = rowTypeInfo.getFieldIndex(r.name) val lv = l.value.asInstanceOf[Comparable[Any]] val rv = row.getField(idx).asInstanceOf[Comparable[Any]] @@ -215,7 +215,7 @@ class TestFilterableTableSource( val lv = l.value.asInstanceOf[Comparable[Any]] val rv = r.value.asInstanceOf[Comparable[Any]] (lv, rv) - case (l: ResolvedFieldReference, r: ResolvedFieldReference) => + case (l: PlannerResolvedFieldReference, r: PlannerResolvedFieldReference) => val lidx = rowTypeInfo.getFieldIndex(l.name) val ridx = rowTypeInfo.getFieldIndex(r.name) val lv = row.getField(lidx).asInstanceOf[Comparable[Any]]