[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15278505#comment-15278505
 ] 

ASF GitHub Bot commented on FLINK-3754:
---------------------------------------

Github user yjshen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1958#discussion_r62714636
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
    @@ -0,0 +1,309 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.logical
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.`type`.RelDataTypeFactory
    +import org.apache.calcite.rel.core.JoinRelType
    +import org.apache.calcite.rel.logical.LogicalProject
    +import org.apache.calcite.schema.{Table => CTable}
    +import org.apache.calcite.tools.RelBuilder
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
    +import org.apache.flink.api.java.operators.join.JoinType
    +import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
    +import org.apache.flink.api.table.expressions._
    +import org.apache.flink.api.table.typeutils.TypeConverter
    +import org.apache.flink.api.table.validate.ValidationException
    +
    +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
    +  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
    +
    +  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
    +    val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
    +    val newProjectList =
    +      afterResolve.projectList.zipWithIndex.map { case (e, i) =>
    +        e match {
    +          case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
    +            case ne: NamedExpression => ne
    +            case e if !e.valid => u
    +            case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
    +            case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
    +          }
    +          case _ => throw new IllegalArgumentException
    +        }
    +    }
    +    Project(newProjectList, child)
    +  }
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    val allAlias = projectList.forall(_.isInstanceOf[Alias])
    +    child.toRelNode(relBuilder)
    +    if (allAlias) {
    +      // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
    +      //   Add a projection ourselves (will be automatically removed by 
translation rules).
    +      relBuilder.push(
    +        LogicalProject.create(relBuilder.peek(),
    +          projectList.map(_.toRexNode(relBuilder)).asJava,
    +          projectList.map(_.name).asJava))
    +    } else {
    +      relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
    +    }
    +  }
    +}
    +
    +case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
    +  override def output: Seq[Attribute] =
    +    throw new UnresolvedException("Invalid call to output on AliasNode")
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
    +    throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
    +
    +  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
    +    if (aliasList.length > child.output.length) {
    +      failValidation("Aliasing more fields than we actually have")
    +    } else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
    +      failValidation("`as` only allow string arguments")
    +    } else {
    +      val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
    +      val input = child.output
    +      Project(
    +        names.zip(input).map { case (name, attr) =>
    +          Alias(attr, name)} ++ input.drop(names.length), child)
    +    }
    +  }
    +}
    +
    +case class Distinct(child: LogicalNode) extends UnaryNode {
    +  override def output: Seq[Attribute] = child.output
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    child.toRelNode(relBuilder)
    +    relBuilder.distinct()
    +  }
    +
    +  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    +    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
    +      failValidation(s"Distinct on stream tables is currently not 
supported.")
    +    }
    +    this
    +  }
    +}
    +
    +case class Sort(order: Seq[Ordering], child: LogicalNode) extends 
UnaryNode {
    +  override def output: Seq[Attribute] = child.output
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    child.toRelNode(relBuilder)
    +    relBuilder.sort(order.map(_.toRexNode(relBuilder)).asJava)
    +  }
    +}
    +
    +case class Filter(condition: Expression, child: LogicalNode) extends 
UnaryNode {
    +  override def output: Seq[Attribute] = child.output
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    child.toRelNode(relBuilder)
    +    relBuilder.filter(condition.toRexNode(relBuilder))
    +  }
    +
    +  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    +    val resolvedFilter = super.validate(tableEnv).asInstanceOf[Filter]
    +    if (resolvedFilter.condition.dataType != BOOLEAN_TYPE_INFO) {
    +      failValidation(s"filter expression ${resolvedFilter.condition} of" +
    +        s" ${resolvedFilter.condition.dataType} is not a boolean")
    +    }
    +    resolvedFilter
    +  }
    +}
    +
    +case class Aggregate(
    +    groupingExpressions: Seq[Expression],
    +    aggregateExpressions: Seq[NamedExpression],
    +    child: LogicalNode) extends UnaryNode {
    +
    +  override def output: Seq[Attribute] = {
    +    (groupingExpressions ++ aggregateExpressions) map { agg =>
    +      agg match {
    +        case ne: NamedExpression => ne.toAttribute
    +        case e => Alias(e, e.toString).toAttribute
    +      }
    +    }
    +  }
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    child.toRelNode(relBuilder)
    +    relBuilder.aggregate(
    +      
relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava),
    +      aggregateExpressions.filter(_.isInstanceOf[Alias]).map { e =>
    +        e match {
    +          case Alias(agg: Aggregation, name) => 
agg.toAggCall(name)(relBuilder)
    +          case _ => null // this should never happen
    +        }
    +      }.asJava)
    +  }
    +
    +  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    +    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
    +      failValidation(s"Aggregate on stream tables is currently not 
supported.")
    +    }
    +
    +    val resolvedAggregate = 
super.validate(tableEnv).asInstanceOf[Aggregate]
    +    val groupingExprs = resolvedAggregate.groupingExpressions
    +    val aggregateExprs = resolvedAggregate.aggregateExpressions
    +    aggregateExprs.foreach(validateAggregateExpression)
    +    groupingExprs.foreach(validateGroupingExpression)
    +
    +    def validateAggregateExpression(expr: Expression): Unit = expr match {
    +      // check no nested aggregation exists.
    +      case aggExpr: Aggregation =>
    +        aggExpr.children.foreach { child =>
    --- End diff --
    
    ```
    aggExpr.children.foreach { child =>
              child.foreach {
    ```
    the `child.foreach` method would first check itself then its own children, 
therefore not only one level deep. I will rewrite this the next commit.


> Add a validation phase before construct RelNode using TableAPI
> --------------------------------------------------------------
>
>                 Key: FLINK-3754
>                 URL: https://issues.apache.org/jira/browse/FLINK-3754
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API
>    Affects Versions: 1.0.0
>            Reporter: Yijie Shen
>            Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to