See below my queryExecution.optimizedPlan before apply my Rule.
01 Project [x#9, p#10, q#11, if (isnull(q#11)) null else UDF(q#11) AS
udfB_10#28, if (isnull(p#10)) null else UDF(p#10) AS udfA_99#93]02 +-
InMemoryRelation [x#9, p#10, q#11], true, 10000, StorageLevel(disk, memory,
deserialized, 1 replicas)03    :  +- *SerializeFromObject
[assertnotnull(input[0, eic.R0, true], top level non-flat input object).x AS
x#9, unwrapoption(IntegerType, assertnotnull(input[0, eic.R0, true], top
level non-flat input object).p) AS p#10, unwrapoption(IntegerType,
assertnotnull(input[0, eic.R0, true], top level non-flat input object).q) AS
q#11]04    :     +- *MapElements , obj#8: eic.R005    :        +-
*DeserializeToObject newInstance(class java.lang.Long), obj#7:
java.lang.Long05    :           +- *Range (0, 3, step=1, splits=Some(2))
In line 01 I need swap the position of udfA and udfB this way:
01 Project [x#9, p#10, q#11, if (isnull(p#10)) null else UDF(p#10) AS
udfA_99#93, if (isnull(q#11)) null else UDF(q#11) AS udfB_10#28]
when I try to change the order of the attributes in a Projection operation
in SparkSQL via Catalyst optimization the result of the query is modified to
an invalid value. Maybe I'm not doing everything is needed. I'm just
changing the order of NamedExpression objects in fields parameter:
object ReorderColumnsOnProjectOptimizationRule extends Rule[LogicalPlan] { 
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {    case
Project(fields: Seq[NamedExpression], child) =>       if
(checkCondition(fields)) Project(newFieldsObject(fields), child) else
Project(fields, child)    case _ => plan  }  private def
newFieldsObject(fields: Seq[NamedExpression]): Seq[NamedExpression] = {   
// compare UDFs computation cost and return the new NamedExpression list   
. . .  }  private def checkCondition(fields: Seq[NamedExpression]): Boolean
= {    // compare UDFs computation cost and return Boolean for decision off
change order on field list.    . . .   }  . . .}
Note: I'm adding my Rule on extraOptimizations SparkSQL object:
spark.experimental.extraOptimizations =
Seq(ReorderColumnsOnProjectOptimizationRule)
Any suggestions will be of great help.



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

Reply via email to