Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62682155 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala --- @@ -20,27 +20,98 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -case class UnresolvedFieldReference(override val name: String) extends LeafExpression { +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.validate.ExprValidationResult + +object NamedExpression { + private val curId = new java.util.concurrent.atomic.AtomicInteger() + def newExprId: Int = curId.getAndIncrement() +} + +trait NamedExpression extends Expression { + def name: String + def exprId: Int + def toAttribute: Attribute +} + +abstract class Attribute extends LeafExpression with NamedExpression { + override def toAttribute: Attribute = this + + def withName(newName: String): Attribute +} + +case class UnresolvedFieldReference(name: String) extends Attribute { + override def exprId: Int = throw new UnresolvedException(s"calling exprId on ${this.getClass}") + override def toString = "\"" + name + override def withName(newName: String): Attribute = UnresolvedFieldReference(newName) + + override def dataType: TypeInformation[_] = + throw new UnresolvedException(s"calling dataType on ${this.getClass}") + + override def validateInput(): ExprValidationResult = + ExprValidationResult.ValidationFailure(s"Unresolved reference $name") +} + +case class ResolvedFieldReference( + name: String, + dataType: TypeInformation[_])( + val exprId: Int = NamedExpression.newExprId) extends Attribute { + + override def toString = s"'$name" + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.field(name) } -} -case class ResolvedFieldReference(override val name: String) extends LeafExpression { - override def toString = s"'$name" + override def withName(newName: String): Attribute = { + if (newName == name) { + this + } else { + ResolvedFieldReference(newName, dataType)(exprId) + } + } } -case class Naming(child: Expression, override val name: String) extends UnaryExpression { +case class Alias(child: Expression, name: String) + extends UnaryExpression with NamedExpression { + val exprId: Int = NamedExpression.newExprId + override def toString = s"$child as '$name" override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.alias(child.toRexNode, name) } - override def makeCopy(anyRefs: Seq[AnyRef]): this.type = { + override def dataType: TypeInformation[_] = child.dataType + + override def makeCopy(anyRefs: Array[AnyRef]): this.type = { val child: Expression = anyRefs.head.asInstanceOf[Expression] copy(child, name).asInstanceOf[this.type] } + + override def toAttribute: Attribute = { + if (valid) { + ResolvedFieldReference(name, child.dataType)(exprId) + } else { + UnresolvedFieldReference(name) + } + } +} + +case class UnresolvedAlias( + child: Expression, + aliasName: Option[String] = None) extends UnaryExpression with NamedExpression { + + override def name: String = + throw new UnresolvedException("Invalid call to name on UnresolvedAlias") + override def toAttribute: Attribute = --- End diff -- can you add a new lines here?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---