dawidwys commented on a change in pull request #8018: [FLINK-11884][table]
Separate creation and validation of LogicalNodes from TableImpl
URL: https://github.com/apache/flink/pull/8018#discussion_r269528319
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
##########
@@ -19,175 +19,13 @@
package org.apache.flink.table.plan
import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.expressions._
import org.apache.flink.table.plan.logical.{LogicalNode, LogicalOverWindow,
Project}
import org.apache.flink.table.typeutils.RowIntervalTypeInfo
-import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
-
object ProjectionTranslator {
- /**
- * Extracts and deduplicates all aggregation and window property
expressions (zero, one, or more)
- * from the given expressions.
- *
- * @param exprs a list of expressions to extract
- * @param tableEnv the TableEnvironment
- * @return a Tuple2, the first field contains the extracted and
deduplicated aggregations,
- * and the second field contains the extracted and deduplicated
window properties.
- */
- def extractAggregationsAndProperties(
- exprs: Seq[PlannerExpression],
- tableEnv: TableEnvironment)
- : (Map[PlannerExpression, String], Map[PlannerExpression, String]) = {
- exprs.foldLeft((Map[PlannerExpression, String](), Map[PlannerExpression,
String]())) {
- (x, y) => identifyAggregationsAndProperties(y, tableEnv, x._1, x._2)
- }
- }
-
- /** Identifies and deduplicates aggregation functions and window properties.
*/
- private def identifyAggregationsAndProperties(
- exp: PlannerExpression,
- tableEnv: TableEnvironment,
- aggNames: Map[PlannerExpression, String],
- propNames: Map[PlannerExpression, String])
- : (Map[PlannerExpression, String], Map[PlannerExpression, String]) = {
-
- exp match {
- case agg: Aggregation =>
- if (aggNames contains agg) {
- (aggNames, propNames)
- } else {
- (aggNames + (agg -> tableEnv.createUniqueAttributeName()), propNames)
- }
- case prop: WindowProperty =>
- if (propNames contains prop) {
- (aggNames, propNames)
- } else {
- (aggNames, propNames + (prop ->
tableEnv.createUniqueAttributeName()))
- }
- case l: LeafExpression =>
- (aggNames, propNames)
- case u: UnaryExpression =>
- identifyAggregationsAndProperties(u.child, tableEnv, aggNames,
propNames)
- case b: BinaryExpression =>
- val l = identifyAggregationsAndProperties(b.left, tableEnv, aggNames,
propNames)
- identifyAggregationsAndProperties(b.right, tableEnv, l._1, l._2)
-
- case sfc @ PlannerScalarFunctionCall(clazz, args) =>
- args.foldLeft((aggNames, propNames)){
- (x, y) => identifyAggregationsAndProperties(y, tableEnv, x._1, x._2)
- }
-
- // General expression
- case e: PlannerExpression =>
- e.productIterator.foldLeft((aggNames, propNames)){
- (x, y) => y match {
- case e: PlannerExpression => identifyAggregationsAndProperties(e,
tableEnv, x._1, x._2)
- case _ => (x._1, x._2)
- }
- }
-
- // Expression is null
- case null =>
Review comment:
You are right. There is no replacement for that at this moment. Though I
believe this was not the best place for it anyways, as it was applied only in a
subset of operations (projections + aggregations).
I think this check should happen in two places:
- during expression construction
- as first step of methods of `TableImpl` that accept `Expression`
What do you think of creating a follow-up ticket for that?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services