This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 514b51de832fec320caf9cc73eb39d35910a1c79
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Wed Jul 17 20:23:37 2019 +0800

    [FLINK-13287][table-api] Port ExistingField to api-java and use new 
Expression in FieldComputer
---
 .../table/sources/tsextractors/ExistingField.java  | 136 +++++++++++++++++++++
 .../table/expressions/ResolvedFieldReference.java  |  29 ++++-
 .../expressions/ExestingFieldFieldReference.scala  |  26 ----
 .../table/expressions/PlannerExpressionUtils.scala |   6 +-
 .../org/apache/flink/table/expressions/call.scala  |   4 +-
 .../apache/flink/table/expressions/composite.scala |   2 +-
 .../flink/table/expressions/fieldExpression.scala  |   2 +-
 .../flink/table/sources/TableSourceUtil.scala      |  13 +-
 .../table/sources/tsextractors/ExistingField.scala | 111 -----------------
 .../ExtendedAggregateExtractProjectRule.java       |   4 +-
 .../table/expressions/PlannerExpressionUtils.scala |   6 +-
 .../org/apache/flink/table/expressions/call.scala  |   4 +-
 .../apache/flink/table/expressions/composite.scala |   2 +-
 .../flink/table/expressions/fieldExpression.scala  |   2 +-
 .../DataStreamGroupWindowAggregateBase.scala       |   4 +-
 .../flink/table/sources/TableSourceUtil.scala      |  29 +++--
 .../table/sources/tsextractors/ExistingField.scala |  88 -------------
 .../flink/table/descriptors/RowtimeTest.scala      |  19 ++-
 .../table/utils/TestFilterableTableSource.scala    |  22 ++--
 19 files changed, 230 insertions(+), 279 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
new file mode 100644
index 0000000..0e8d73f
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.tsextractors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.table.types.DataType;
+
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.typeLiteral;
+import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Converts an existing {@link Long}, {@link java.sql.Timestamp}, or
+ * timestamp formatted java.lang.String field (e.g., "2018-05-28 
12:34:56.000") into
+ * a rowtime attribute.
+ */
+@PublicEvolving
+public final class ExistingField extends TimestampExtractor {
+
+       private static final long serialVersionUID = 1L;
+
+       private String field;
+
+       /**
+        * @param field The field to convert into a rowtime attribute.
+        */
+       public ExistingField(String field) {
+               this.field = checkNotNull(field);
+       }
+
+       @Override
+       public String[] getArgumentFields() {
+               return new String[] {field};
+       }
+
+       @Override
+       public void validateArgumentFields(TypeInformation<?>[] 
argumentFieldTypes) {
+               DataType fieldType = 
fromLegacyInfoToDataType(argumentFieldTypes[0]);
+
+               switch (fieldType.getLogicalType().getTypeRoot()) {
+                       case BIGINT:
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                       case VARCHAR:
+                               break;
+                       default:
+                               throw new ValidationException(String.format(
+                                               "Field '%s' must be of type 
Long or Timestamp or String but is of type %s.",
+                                               field, fieldType));
+               }
+       }
+
+       /**
+        * Returns an {@link Expression} that casts a {@link Long}, {@link 
Timestamp}, or
+        * timestamp formatted {@link String} field (e.g., "2018-05-28 
12:34:56.000")
+        * into a rowtime attribute.
+        */
+       @Override
+       public Expression getExpression(ResolvedFieldReference[] fieldAccesses) 
{
+               ResolvedFieldReference fieldAccess = fieldAccesses[0];
+               DataType type = 
fromLegacyInfoToDataType(fieldAccess.resultType());
+
+               FieldReferenceExpression fieldReferenceExpr = new 
FieldReferenceExpression(
+                               fieldAccess.name(),
+                               type,
+                               0,
+                               fieldAccess.fieldIndex());
+
+               switch (type.getLogicalType().getTypeRoot()) {
+                       case BIGINT:
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                               return fieldReferenceExpr;
+                       case VARCHAR:
+                               return unresolvedCall(
+                                               CAST,
+                                               fieldReferenceExpr,
+                                               
typeLiteral(TIMESTAMP(3).bridgedTo(Timestamp.class)));
+                       default:
+                               throw new RuntimeException("Unsupport type: " + 
type);
+               }
+       }
+
+       @Override
+       public Map<String, String> toProperties() {
+               Map<String, String> map = new HashMap<>();
+               map.put(Rowtime.ROWTIME_TIMESTAMPS_TYPE, 
Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD);
+               map.put(Rowtime.ROWTIME_TIMESTAMPS_FROM, field);
+               return map;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               ExistingField that = (ExistingField) o;
+               return field.equals(that.field);
+       }
+
+       @Override
+       public int hashCode() {
+               return field.hashCode();
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ResolvedFieldReference.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ResolvedFieldReference.java
index 515dcd0..8be0679 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ResolvedFieldReference.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ResolvedFieldReference.java
@@ -21,18 +21,37 @@ package org.apache.flink.table.expressions;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.sources.FieldComputer;
+import org.apache.flink.util.Preconditions;
 
 /**
  * A reference to a field in an input which has been resolved.
  *
  * <p>Note: This interface is added as a temporary solution. It is used to 
keep api compatible
- * for {@link FieldComputer}. In the long term, this interface can be removed 
when we unify
- * the {@link Expression} and {@code PlannerExpression}.
+ * for {@link FieldComputer}. In the long term, this interface can be removed.
  */
 @PublicEvolving
-public interface ResolvedFieldReference {
+public class ResolvedFieldReference {
 
-       TypeInformation<?> resultType();
+       private final String name;
+       private final TypeInformation<?> resultType;
+       private final int fieldIndex;
 
-       String name();
+       public ResolvedFieldReference(String name, TypeInformation<?> 
resultType, int fieldIndex) {
+               Preconditions.checkArgument(fieldIndex >= 0, "Index of field 
should be a positive number");
+               this.name = Preconditions.checkNotNull(name, "Field name must 
not be null.");
+               this.resultType = Preconditions.checkNotNull(resultType, "Field 
result type must not be null.");
+               this.fieldIndex = fieldIndex;
+       }
+
+       public TypeInformation<?> resultType() {
+               return resultType;
+       }
+
+       public String name() {
+               return name;
+       }
+
+       public int fieldIndex() {
+               return fieldIndex;
+       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala
deleted file mode 100644
index 0ad1f5e..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-case class ExestingFieldFieldReference(
-    name: String,
-    resultType: TypeInformation[_],
-    fieldIndex: Int) extends ResolvedFieldReference
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
index 7f7397f..fff6b59 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
@@ -36,20 +36,20 @@ object PlannerExpressionUtils {
   }
 
   private[flink] def isTimeAttribute(expr: PlannerExpression): Boolean = expr 
match {
-    case r: ResolvedFieldReference if 
FlinkTypeFactory.isTimeIndicatorType(r.resultType) =>
+    case r: PlannerResolvedFieldReference if 
FlinkTypeFactory.isTimeIndicatorType(r.resultType) =>
       true
     case _ => false
   }
 
   private[flink] def isRowtimeAttribute(expr: PlannerExpression): Boolean = 
expr match {
-    case r: ResolvedFieldReference
+    case r: PlannerResolvedFieldReference
       if FlinkTypeFactory.isRowtimeIndicatorType(r.resultType) =>
       true
     case _ => false
   }
 
   private[flink] def isProctimeAttribute(expr: PlannerExpression): Boolean = 
expr match {
-    case r: ResolvedFieldReference
+    case r: PlannerResolvedFieldReference
       if FlinkTypeFactory.isProctimeIndicatorType(r.resultType) =>
       true
     case _ => false
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala
index 406571f..46059b1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -84,9 +84,9 @@ case class OverCall(
 
     // check partitionBy expression keys are resolved field reference
     partitionBy.foreach {
-      case r: ResolvedFieldReference if r.resultType.isKeyType  =>
+      case r: PlannerResolvedFieldReference if r.resultType.isKeyType  =>
         ValidationSuccess
-      case r: ResolvedFieldReference =>
+      case r: PlannerResolvedFieldReference =>
         return ValidationFailure(s"Invalid PartitionBy expression: $r. " +
           s"Expression must return key type.")
       case r =>
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala
index 2b49cd2..64a2f63 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/composite.scala
@@ -92,7 +92,7 @@ case class GetCompositeField(child: PlannerExpression, key: 
Any) extends UnaryEx
       } else {
         None
       }
-    case c: ResolvedFieldReference =>
+    case c: PlannerResolvedFieldReference =>
       val keySuffix = if (key.isInstanceOf[Int]) s"_$key" else key
       Some(s"${c.name}$$$keySuffix")
     case _ => None
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
index bd27739..6d19c8f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -67,7 +67,7 @@ case class UnresolvedFieldReference(name: String) extends 
Attribute {
 
 case class PlannerResolvedFieldReference(
     name: String,
-    resultType: TypeInformation[_]) extends Attribute with 
ResolvedFieldReference {
+    resultType: TypeInformation[_]) extends Attribute {
 
   override def toString = s"'$name"
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
index 59a8e03..7a750ee 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.{DataTypes, ValidationException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, 
unresolvedCall}
-import org.apache.flink.table.expressions.{ExestingFieldFieldReference, 
ResolvedFieldReference, RexNodeConverter}
+import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, 
unresolvedCall, valueLiteral}
+import org.apache.flink.table.expressions.{ResolvedFieldReference, 
RexNodeConverter}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.types.LogicalTypeDataTypeConverter
 import 
org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
@@ -273,17 +273,20 @@ object TableSourceUtil {
           // push an empty values node with the physical schema on the 
relbuilder
           relBuilder.push(createSchemaRelNode(resolvedFields))
           // get extraction expression
-          resolvedFields.map(f => ExestingFieldFieldReference(f._1, f._3, 
f._2))
+          resolvedFields.map(f => new ResolvedFieldReference(f._1, f._3, f._2))
         } else {
           new Array[ResolvedFieldReference](0)
         }
 
       val expression = tsExtractor.getExpression(fieldAccesses)
       // add cast to requested type and convert expression to RexNode
+      // blink runner treats numeric types as seconds in the cast of timestamp 
and numerical types.
+      // So we use REINTERPRET_CAST to keep the mills of numeric types.
       val castExpression = unresolvedCall(
-        BuiltInFunctionDefinitions.CAST,
+        BuiltInFunctionDefinitions.REINTERPRET_CAST,
         expression,
-        typeLiteral(DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp])))
+        typeLiteral(DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp])),
+        valueLiteral(false))
       val rexExpression = castExpression.accept(new 
RexNodeConverter(relBuilder))
       relBuilder.clear()
       rexExpression
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
deleted file mode 100644
index 6b39883..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.tsextractors
-
-import org.apache.flink.api.common.typeinfo.{LocalTimeTypeInfo, 
TypeInformation, Types}
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.descriptors.Rowtime
-import org.apache.flink.table.expressions._
-import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, 
unresolvedCall, valueLiteral}
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions
-import 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
-
-import java.util
-
-/**
-  * Converts an existing [[Long]], [[java.sql.Timestamp]], or
-  * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 
12:34:56.000") into
-  * a rowtime attribute.
-  *
-  * @param field The field to convert into a rowtime attribute.
-  */
-final class ExistingField(val field: String) extends TimestampExtractor {
-
-  override def getArgumentFields: Array[String] = Array(field)
-
-  @throws[ValidationException]
-  override def validateArgumentFields(argumentFieldTypes: 
Array[TypeInformation[_]]): Unit = {
-    val fieldType = argumentFieldTypes(0)
-
-    fieldType match {
-      case Types.LONG => // OK
-      case Types.SQL_TIMESTAMP => // OK
-      case Types.LOCAL_DATE_TIME => // OK
-      case Types.STRING => // OK
-      case _: TypeInformation[_] =>
-        throw new ValidationException(
-          s"Field '$field' must be of type Long or Timestamp or String but is 
of type $fieldType.")
-    }
-  }
-
-  /**
-    * Returns an [[Expression]] that casts a [[Long]], [[java.sql.Timestamp]], 
or
-    * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 
12:34:56.000")
-    * into a rowtime attribute.
-    */
-  override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): 
Expression = {
-    val fieldAccess: ExestingFieldFieldReference = fieldAccesses(0)
-        .asInstanceOf[ExestingFieldFieldReference]
-
-    val fieldReferenceExpr = new FieldReferenceExpression(
-      fieldAccess.name,
-      fromLegacyInfoToDataType(fieldAccess.resultType),
-      0,
-      fieldAccess.fieldIndex)
-
-    fieldAccess.resultType match {
-      case Types.LONG =>
-        // access LONG field
-        val innerDiv = unresolvedCall(
-          BuiltInFunctionDefinitions.DIVIDE,
-          fieldReferenceExpr,
-          valueLiteral(new java.math.BigDecimal(1000)))
-
-        unresolvedCall(
-          BuiltInFunctionDefinitions.CAST,
-          innerDiv,
-          typeLiteral(fromLegacyInfoToDataType(Types.SQL_TIMESTAMP)))
-
-      case Types.SQL_TIMESTAMP | LocalTimeTypeInfo.LOCAL_DATE_TIME =>
-        fieldReferenceExpr
-
-      case Types.STRING =>
-        unresolvedCall(
-          BuiltInFunctionDefinitions.CAST,
-          fieldReferenceExpr,
-          typeLiteral(fromLegacyInfoToDataType(Types.SQL_TIMESTAMP)))
-    }
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case that: ExistingField => field == that.field
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    field.hashCode
-  }
-
-  override def toProperties: util.Map[String, String] = {
-    val javaMap = new util.HashMap[String, String]()
-    javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_TYPE, 
Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD)
-    javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_FROM, field)
-    javaMap
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java
index d863297..80c297e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/rules/logical/ExtendedAggregateExtractProjectRule.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.rules.logical;
 
-import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.table.expressions.PlannerResolvedFieldReference;
 import org.apache.flink.table.plan.logical.LogicalWindow;
 import org.apache.flink.table.plan.logical.rel.LogicalTableAggregate;
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate;
@@ -207,7 +207,7 @@ public class ExtendedAggregateExtractProjectRule extends 
AggregateExtractProject
        }
 
        private int getWindowTimeFieldIndex(LogicalWindow logicalWindow, 
RelNode input) {
-               ResolvedFieldReference timeAttribute = (ResolvedFieldReference) 
logicalWindow.timeAttribute();
+               PlannerResolvedFieldReference timeAttribute = 
(PlannerResolvedFieldReference) logicalWindow.timeAttribute();
                return 
input.getRowType().getFieldNames().indexOf(timeAttribute.name());
        }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
index 7f7397f..fff6b59 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionUtils.scala
@@ -36,20 +36,20 @@ object PlannerExpressionUtils {
   }
 
   private[flink] def isTimeAttribute(expr: PlannerExpression): Boolean = expr 
match {
-    case r: ResolvedFieldReference if 
FlinkTypeFactory.isTimeIndicatorType(r.resultType) =>
+    case r: PlannerResolvedFieldReference if 
FlinkTypeFactory.isTimeIndicatorType(r.resultType) =>
       true
     case _ => false
   }
 
   private[flink] def isRowtimeAttribute(expr: PlannerExpression): Boolean = 
expr match {
-    case r: ResolvedFieldReference
+    case r: PlannerResolvedFieldReference
       if FlinkTypeFactory.isRowtimeIndicatorType(r.resultType) =>
       true
     case _ => false
   }
 
   private[flink] def isProctimeAttribute(expr: PlannerExpression): Boolean = 
expr match {
-    case r: ResolvedFieldReference
+    case r: PlannerResolvedFieldReference
       if FlinkTypeFactory.isProctimeIndicatorType(r.resultType) =>
       true
     case _ => false
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
index 508a7f2..f2da6fc 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -172,9 +172,9 @@ case class OverCall(
 
     // check partitionBy expression keys are resolved field reference
     partitionBy.foreach {
-      case r: ResolvedFieldReference if r.resultType.isKeyType  =>
+      case r: PlannerResolvedFieldReference if r.resultType.isKeyType  =>
         ValidationSuccess
-      case r: ResolvedFieldReference =>
+      case r: PlannerResolvedFieldReference =>
         return ValidationFailure(s"Invalid PartitionBy expression: $r. " +
           s"Expression must return key type.")
       case r =>
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/composite.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/composite.scala
index 1f858a1..3e2c374 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/composite.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/composite.scala
@@ -100,7 +100,7 @@ case class GetCompositeField(child: PlannerExpression, key: 
Any) extends UnaryEx
       } else {
         None
       }
-    case c: ResolvedFieldReference =>
+    case c: PlannerResolvedFieldReference =>
       val keySuffix = if (key.isInstanceOf[Int]) s"_$key" else key
       Some(s"${c.name}$$$keySuffix")
     case _ => None
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
index 56d4e72..ced9b32 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -71,7 +71,7 @@ case class UnresolvedFieldReference(name: String) extends 
Attribute {
 
 case class PlannerResolvedFieldReference(
     name: String,
-    resultType: TypeInformation[_]) extends Attribute with 
ResolvedFieldReference {
+    resultType: TypeInformation[_]) extends Attribute {
 
   override def toString = s"'$name"
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregateBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregateBase.scala
index 2b1c1fb..e381464 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregateBase.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregateBase.scala
@@ -29,7 +29,7 @@ import 
org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWin
 import org.apache.flink.table.api.{StreamQueryConfig, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.expressions.PlannerExpressionUtils._
-import org.apache.flink.table.expressions.ResolvedFieldReference
+import org.apache.flink.table.expressions.PlannerResolvedFieldReference
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.nodes.CommonAggregate
 import 
org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregateBase._
@@ -135,7 +135,7 @@ abstract class DataStreamGroupWindowAggregateBase(
 
     val timestampedInput = if (isRowtimeAttribute(window.timeAttribute)) {
       // copy the window rowtime attribute into the StreamRecord timestamp 
field
-      val timeAttribute = 
window.timeAttribute.asInstanceOf[ResolvedFieldReference].name
+      val timeAttribute = 
window.timeAttribute.asInstanceOf[PlannerResolvedFieldReference].name
       val timeIdx = inputSchema.fieldNames.indexOf(timeAttribute)
       if (timeIdx < 0) {
         throw new TableException("Time attribute could not be found. This is a 
bug.")
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
index ccc4733..f91f2fa 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.sources
 
 import java.sql.Timestamp
-
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.RelOptCluster
 import org.apache.calcite.rel.RelNode
@@ -29,10 +28,12 @@ import org.apache.calcite.rex.{RexLiteral, RexNode}
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.table.api.{TableException, Types, ValidationException}
+import org.apache.flink.table.api.{DataTypes, TableException, Types, 
ValidationException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions.{Cast, PlannerExpression, 
PlannerResolvedFieldReference, ResolvedFieldReference}
-import 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
+import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, 
unresolvedCall}
+import org.apache.flink.table.expressions.{PlannerExpressionConverter, 
ResolvedFieldReference}
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST
+import 
org.apache.flink.table.types.utils.TypeConversions.{fromDataTypeToLegacyInfo, 
fromLegacyInfoToDataType}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 import scala.collection.JavaConverters._
@@ -266,17 +267,23 @@ object TableSourceUtil {
         // push an empty values node with the physical schema on the relbuilder
         relBuilder.push(createSchemaRelNode(resolvedFields))
         // get extraction expression
-        resolvedFields.map(f => PlannerResolvedFieldReference(f._1, f._3))
+        resolvedFields.map(f => new ResolvedFieldReference(f._1, f._3, f._2))
       } else {
-        new Array[PlannerResolvedFieldReference](0)
+        new Array[ResolvedFieldReference](0)
       }
 
-      val expression = tsExtractor
-        
.getExpression(fieldAccesses.map(_.asInstanceOf[ResolvedFieldReference]))
+      val expression = tsExtractor.getExpression(fieldAccesses)
       // add cast to requested type and convert expression to RexNode
-      // TODO we cast to planner expressions as a temporary solution to keep 
the old interfaces
-      val rexExpression = Cast(expression.asInstanceOf[PlannerExpression], 
resultType)
-        .toRexNode(relBuilder)
+      // If resultType is TimeIndicatorTypeInfo, its internal format is long, 
but cast
+      // from Timestamp is java.sql.Timestamp. So we need cast to long first.
+      val castExpression = unresolvedCall(CAST,
+        unresolvedCall(CAST, expression, typeLiteral(DataTypes.BIGINT())),
+        typeLiteral(fromLegacyInfoToDataType(resultType)))
+
+      // TODO we convert to planner expressions as a temporary solution
+      val rexExpression = castExpression
+          .accept(PlannerExpressionConverter.INSTANCE)
+          .toRexNode(relBuilder)
       relBuilder.clear()
       rexExpression
     }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
deleted file mode 100644
index c9f4477..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.tsextractors
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation, 
Types}
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.descriptors.Rowtime
-import org.apache.flink.table.expressions._
-
-/**
-  * Converts an existing [[Long]], [[java.sql.Timestamp]], or
-  * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 
12:34:56.000") into
-  * a rowtime attribute.
-  *
-  * @param field The field to convert into a rowtime attribute.
-  */
-final class ExistingField(val field: String) extends TimestampExtractor {
-
-  override def getArgumentFields: Array[String] = Array(field)
-
-  @throws[ValidationException]
-  override def validateArgumentFields(argumentFieldTypes: 
Array[TypeInformation[_]]): Unit = {
-    val fieldType = argumentFieldTypes(0)
-
-    fieldType match {
-      case Types.LONG => // OK
-      case Types.SQL_TIMESTAMP => // OK
-      case Types.STRING => // OK
-      case _: TypeInformation[_] =>
-        throw new ValidationException(
-          s"Field '$field' must be of type Long or Timestamp or String but is 
of type $fieldType.")
-    }
-  }
-
-  /**
-    * Returns an [[Expression]] that casts a [[Long]], [[java.sql.Timestamp]], 
or
-    * timestamp formatted [[java.lang.String]] field (e.g., "2018-05-28 
12:34:56.000")
-    * into a rowtime attribute.
-    */
-  override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): 
PlannerExpression = {
-    val fieldAccess: PlannerExpression = 
fieldAccesses(0).asInstanceOf[PlannerExpression]
-
-    fieldAccess.resultType match {
-      case Types.LONG =>
-        // access LONG field
-        fieldAccess
-      case Types.SQL_TIMESTAMP =>
-        // cast timestamp to long
-        Cast(fieldAccess, Types.LONG)
-      case Types.STRING =>
-        Cast(Cast(fieldAccess, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)
-    }
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case that: ExistingField => field == that.field
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    field.hashCode
-  }
-
-  override def toProperties: util.Map[String, String] = {
-    val javaMap = new util.HashMap[String, String]()
-    javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_TYPE, 
Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD)
-    javaMap.put(Rowtime.ROWTIME_TIMESTAMPS_FROM, field)
-    javaMap
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
index 3424088..0f51c76 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
@@ -19,15 +19,18 @@
 package org.apache.flink.table.descriptors
 
 import java.util
-
 import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.{DataTypes, ValidationException}
 import org.apache.flink.table.descriptors.RowtimeTest.{CustomAssigner, 
CustomExtractor}
 import org.apache.flink.table.expressions._
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.sources.tsextractors.TimestampExtractor
 import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
+import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.types.Row
+
 import org.junit.Test
 
 import scala.collection.JavaConverters._
@@ -130,9 +133,17 @@ object RowtimeTest {
     }
 
     override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): 
Expression = {
-      val fieldAccess: PlannerExpression = 
fieldAccesses(0).asInstanceOf[PlannerExpression]
+      val fieldAccess = fieldAccesses(0)
       require(fieldAccess.resultType == Types.SQL_TIMESTAMP)
-      Cast(fieldAccess, Types.LONG)
+      val fieldReferenceExpr = new FieldReferenceExpression(
+        fieldAccess.name,
+        TypeConversions.fromLegacyInfoToDataType(fieldAccess.resultType),
+        0,
+        fieldAccess.fieldIndex)
+      ApiExpressionUtils.unresolvedCall(
+        BuiltInFunctionDefinitions.CAST,
+        fieldReferenceExpr,
+        ApiExpressionUtils.typeLiteral(DataTypes.BIGINT()))
     }
 
     override def equals(other: Any): Boolean = other match {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
index 1372da5..4f767f6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
@@ -161,11 +161,11 @@ class TestFilterableTableSource(
 
   private def shouldPushDown(expr: BinaryComparison): Boolean = {
     (expr.left, expr.right) match {
-      case (f: ResolvedFieldReference, v: Literal) =>
+      case (f: PlannerResolvedFieldReference, v: Literal) =>
         filterableFields.contains(f.name)
-      case (v: Literal, f: ResolvedFieldReference) =>
+      case (v: Literal, f: PlannerResolvedFieldReference) =>
         filterableFields.contains(f.name)
-      case (f1: ResolvedFieldReference, f2: ResolvedFieldReference) =>
+      case (f1: PlannerResolvedFieldReference, f2: 
PlannerResolvedFieldReference) =>
         filterableFields.contains(f1.name) && 
filterableFields.contains(f2.name)
       case (_, _) => false
     }
@@ -184,15 +184,15 @@ class TestFilterableTableSource(
     expr match {
       case _: GreaterThan =>
         lhsValue.compareTo(rhsValue) > 0
-      case LessThan(l: ResolvedFieldReference, r: Literal) =>
+      case LessThan(l: PlannerResolvedFieldReference, r: Literal) =>
         lhsValue.compareTo(rhsValue) < 0
-      case GreaterThanOrEqual(l: ResolvedFieldReference, r: Literal) =>
+      case GreaterThanOrEqual(l: PlannerResolvedFieldReference, r: Literal) =>
         lhsValue.compareTo(rhsValue) >= 0
-      case LessThanOrEqual(l: ResolvedFieldReference, r: Literal) =>
+      case LessThanOrEqual(l: PlannerResolvedFieldReference, r: Literal) =>
         lhsValue.compareTo(rhsValue) <= 0
-      case EqualTo(l: ResolvedFieldReference, r: Literal) =>
+      case EqualTo(l: PlannerResolvedFieldReference, r: Literal) =>
         lhsValue.compareTo(rhsValue) == 0
-      case NotEqualTo(l: ResolvedFieldReference, r: Literal) =>
+      case NotEqualTo(l: PlannerResolvedFieldReference, r: Literal) =>
         lhsValue.compareTo(rhsValue) != 0
     }
   }
@@ -201,12 +201,12 @@ class TestFilterableTableSource(
     : (Comparable[Any], Comparable[Any]) = {
 
     (expr.left, expr.right) match {
-      case (l: ResolvedFieldReference, r: Literal) =>
+      case (l: PlannerResolvedFieldReference, r: Literal) =>
         val idx = rowTypeInfo.getFieldIndex(l.name)
         val lv = row.getField(idx).asInstanceOf[Comparable[Any]]
         val rv = r.value.asInstanceOf[Comparable[Any]]
         (lv, rv)
-      case (l: Literal, r: ResolvedFieldReference) =>
+      case (l: Literal, r: PlannerResolvedFieldReference) =>
         val idx = rowTypeInfo.getFieldIndex(r.name)
         val lv = l.value.asInstanceOf[Comparable[Any]]
         val rv = row.getField(idx).asInstanceOf[Comparable[Any]]
@@ -215,7 +215,7 @@ class TestFilterableTableSource(
         val lv = l.value.asInstanceOf[Comparable[Any]]
         val rv = r.value.asInstanceOf[Comparable[Any]]
         (lv, rv)
-      case (l: ResolvedFieldReference, r: ResolvedFieldReference) =>
+      case (l: PlannerResolvedFieldReference, r: 
PlannerResolvedFieldReference) =>
         val lidx = rowTypeInfo.getFieldIndex(l.name)
         val ridx = rowTypeInfo.getFieldIndex(r.name)
         val lv = row.getField(lidx).asInstanceOf[Comparable[Any]]

Reply via email to