Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3808#discussion_r114923783
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
    @@ -577,70 +577,94 @@ abstract class TableEnvironment(val config: 
TableConfig) {
     
       /**
         * Returns field names and field positions for a given 
[[TypeInformation]] and [[Array]] of
    -    * [[Expression]].
    +    * [[Expression]]. It does not handle time attributes but considers 
them in indices, if
    +    * ignore flag is not false.
         *
         * @param inputType The [[TypeInformation]] against which the 
[[Expression]]s are evaluated.
         * @param exprs The expressions that define the field names.
    +    * @param ignoreTimeAttributes ignore time attributes and handle them 
as regular expressions.
         * @tparam A The type of the TypeInformation.
         * @return A tuple of two arrays holding the field names and 
corresponding field positions.
         */
       protected[flink] def getFieldInfo[A](
    -    inputType: TypeInformation[A],
    -    exprs: Array[Expression]): (Array[String], Array[Int]) = {
    +      inputType: TypeInformation[A],
    +      exprs: Array[Expression],
    +      ignoreTimeAttributes: Boolean)
    +    : (Array[String], Array[Int]) = {
     
         TableEnvironment.validateType(inputType)
     
    +    val filteredExprs = if (ignoreTimeAttributes) {
    +        exprs.map {
    +          case ta: TimeAttribute => ta.expression
    +          case e@_ => e
    +        }
    +    } else {
    +      exprs
    +    }
    +
         val indexedNames: Array[(Int, String)] = inputType match {
           case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
             throw new TableException(
               "An input of GenericTypeInfo<Row> cannot be converted to Table. 
" +
                 "Please specify the type of the input with a RowTypeInfo.")
           case a: AtomicType[A] =>
    -        if (exprs.length != 1) {
    -          throw new TableException("Table of atomic type can only have a 
single field.")
    -        }
    -        exprs.map {
    -          case UnresolvedFieldReference(name) => (0, name)
    +        filteredExprs.zipWithIndex flatMap {
    +          case (UnresolvedFieldReference(name), idx) =>
    +            if (idx > 0) {
    +              throw new TableException("Table of atomic type can only have 
a single field.")
    +            }
    +            Some((0, name))
    +          case (_: TimeAttribute, _) if ignoreTimeAttributes =>
    --- End diff --
    
    Can we remove this if `if ignoreTimeAttributes` check? Because In 
DataStream mode. we also need parse the `TimeAttribute`. e.g: 
    ` streamTestUtil().addTable[String]('string,'t.rowtime)`
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to