[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15278196#comment-15278196
 ] 

ASF GitHub Bot commented on FLINK-3754:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1958#discussion_r62683790
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
    @@ -0,0 +1,309 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.logical
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.`type`.RelDataTypeFactory
    +import org.apache.calcite.rel.core.JoinRelType
    +import org.apache.calcite.rel.logical.LogicalProject
    +import org.apache.calcite.schema.{Table => CTable}
    +import org.apache.calcite.tools.RelBuilder
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
    +import org.apache.flink.api.java.operators.join.JoinType
    +import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
    +import org.apache.flink.api.table.expressions._
    +import org.apache.flink.api.table.typeutils.TypeConverter
    +import org.apache.flink.api.table.validate.ValidationException
    +
    +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
    +  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
    +
    +  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
    +    val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
    +    val newProjectList =
    +      afterResolve.projectList.zipWithIndex.map { case (e, i) =>
    +        e match {
    +          case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
    +            case ne: NamedExpression => ne
    +            case e if !e.valid => u
    +            case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
    +            case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
    +          }
    +          case _ => throw new IllegalArgumentException
    +        }
    +    }
    +    Project(newProjectList, child)
    +  }
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    val allAlias = projectList.forall(_.isInstanceOf[Alias])
    +    child.toRelNode(relBuilder)
    +    if (allAlias) {
    +      // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
    +      //   Add a projection ourselves (will be automatically removed by 
translation rules).
    +      relBuilder.push(
    +        LogicalProject.create(relBuilder.peek(),
    +          projectList.map(_.toRexNode(relBuilder)).asJava,
    +          projectList.map(_.name).asJava))
    +    } else {
    +      relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
    +    }
    +  }
    +}
    +
    +case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
    +  override def output: Seq[Attribute] =
    +    throw new UnresolvedException("Invalid call to output on AliasNode")
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
    +    throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
    +
    +  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
    +    if (aliasList.length > child.output.length) {
    +      failValidation("Aliasing more fields than we actually have")
    +    } else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
    +      failValidation("`as` only allow string arguments")
    --- End diff --
    
    I think this error message might be confusing because the Scala API does 
not use String arguments. 


> Add a validation phase before construct RelNode using TableAPI
> --------------------------------------------------------------
>
>                 Key: FLINK-3754
>                 URL: https://issues.apache.org/jira/browse/FLINK-3754
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API
>    Affects Versions: 1.0.0
>            Reporter: Yijie Shen
>            Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to