dawidwys commented on a change in pull request #8018: [FLINK-11884][table]
Separate creation and validation of LogicalNodes from TableImpl
URL: https://github.com/apache/flink/pull/8018#discussion_r269571621
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
##########
@@ -19,175 +19,13 @@
package org.apache.flink.table.plan
import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.expressions._
import org.apache.flink.table.plan.logical.{LogicalNode, LogicalOverWindow,
Project}
import org.apache.flink.table.typeutils.RowIntervalTypeInfo
-import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
-
object ProjectionTranslator {
- /**
- * Extracts and deduplicates all aggregation and window property
expressions (zero, one, or more)
- * from the given expressions.
- *
- * @param exprs a list of expressions to extract
- * @param tableEnv the TableEnvironment
- * @return a Tuple2, the first field contains the extracted and
deduplicated aggregations,
- * and the second field contains the extracted and deduplicated
window properties.
- */
- def extractAggregationsAndProperties(
- exprs: Seq[PlannerExpression],
- tableEnv: TableEnvironment)
- : (Map[PlannerExpression, String], Map[PlannerExpression, String]) = {
- exprs.foldLeft((Map[PlannerExpression, String](), Map[PlannerExpression,
String]())) {
- (x, y) => identifyAggregationsAndProperties(y, tableEnv, x._1, x._2)
- }
- }
-
- /** Identifies and deduplicates aggregation functions and window properties.
*/
- private def identifyAggregationsAndProperties(
- exp: PlannerExpression,
- tableEnv: TableEnvironment,
- aggNames: Map[PlannerExpression, String],
- propNames: Map[PlannerExpression, String])
- : (Map[PlannerExpression, String], Map[PlannerExpression, String]) = {
-
- exp match {
- case agg: Aggregation =>
- if (aggNames contains agg) {
- (aggNames, propNames)
- } else {
- (aggNames + (agg -> tableEnv.createUniqueAttributeName()), propNames)
- }
- case prop: WindowProperty =>
- if (propNames contains prop) {
- (aggNames, propNames)
- } else {
- (aggNames, propNames + (prop ->
tableEnv.createUniqueAttributeName()))
- }
- case l: LeafExpression =>
- (aggNames, propNames)
- case u: UnaryExpression =>
- identifyAggregationsAndProperties(u.child, tableEnv, aggNames,
propNames)
- case b: BinaryExpression =>
- val l = identifyAggregationsAndProperties(b.left, tableEnv, aggNames,
propNames)
- identifyAggregationsAndProperties(b.right, tableEnv, l._1, l._2)
-
- case sfc @ PlannerScalarFunctionCall(clazz, args) =>
- args.foldLeft((aggNames, propNames)){
- (x, y) => identifyAggregationsAndProperties(y, tableEnv, x._1, x._2)
- }
-
- // General expression
- case e: PlannerExpression =>
- e.productIterator.foldLeft((aggNames, propNames)){
- (x, y) => y match {
- case e: PlannerExpression => identifyAggregationsAndProperties(e,
tableEnv, x._1, x._2)
- case _ => (x._1, x._2)
- }
- }
-
- // Expression is null
- case null =>
Review comment:
Not necesserily. We will still need to make sure in those places that user
did not pass null (or add automatic conversion to null literal there)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services