singhpk234 commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1204402182
##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java:
##########
@@ -1455,6 +1457,31 @@ public void
testUpdateToWapBranchWithTableBranchIdentifier() {
branch)));
}
+ @Test
+ public void subExpressionEliminationInCodegen() {
+ createAndInitTable("id INT, c1 INT, c2 INT");
+
+ sql("INSERT INTO TABLE %s VALUES (1, 11, 111), (2, 22, 222)", tableName);
+ createBranchIfNeeded();
+
+ // disable AQE to see the final plan with codegen in EXPLAIN
Review Comment:
can we do something like :
```
def getCodeAndCommentForUpdateRowExec(df: DataFrame): CodeAndComment = {
val plan = df.queryExecution.executedPlan
plan.execute()
val updateRowExec = findTopLevelUpdateRowExec(plan)
getCodeAndComment(updateRowExec.head)
}
def findTopLevelUpdateRowExec(plan: SparkPlan): Seq[UpdateRowExec] = {
filterByType[UpdateRowExec](plan)
}
private def getCodeAndComment(plan: SparkPlan): CodeAndComment = {
val codeGenSubTree = WholeStageCodegenExec(plan)(1)
val codeAndComment = codeGenSubTree.doCodeGen()._2
try {
CodeGenerator.compile(codeAndComment)
} catch {
case e: Exception =>
val msg =
s"""
|failed to compile:
|Subtree:
|$codeGenSubTree
|Generated code:
|${CodeFormatter.format(codeAndComment)}
""".stripMargin
fail(msg, e)
}
codeAndComment
}
```
This way we can test both AQE without AQE plans, thoughts @aokolnychyi ?
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
copy(child = newChild)
}
+ override def usedInputs: AttributeSet = {
+ // only attributes used at least twice should be evaluated before this
plan,
+ // otherwise defer the evaluation until an attribute is actually used
+ val usedExprIds = insertOutput.flatMap(_.collect {
+ case attr: Attribute => attr.exprId
+ })
+ val usedMoreThanOnceExprIds = usedExprIds.groupBy(id =>
id).filter(_._2.size > 1).keySet
+ references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+ }
+
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
Review Comment:
should we also make needCopyResult true, considering this operator produces
two row for a single row.
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L348
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
copy(child = newChild)
}
+ override def usedInputs: AttributeSet = {
+ // only attributes used at least twice should be evaluated before this
plan,
+ // otherwise defer the evaluation until an attribute is actually used
+ val usedExprIds = insertOutput.flatMap(_.collect {
+ case attr: Attribute => attr.exprId
+ })
+ val usedMoreThanOnceExprIds = usedExprIds.groupBy(id =>
id).filter(_._2.size > 1).keySet
+ references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+ }
+
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
+ }
+
+ override protected def doProduce(ctx: CodegenContext): String = {
+ child.asInstanceOf[CodegenSupport].produce(ctx, this)
+ }
+
+ override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row:
ExprCode): String = {
+ // no need to perform sub expression elimination for the delete projection
as
+ // as it only outputs row ID and metadata attributes without any changes
+ val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+ val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+ val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+ val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =
+ if (conf.subexpressionEliminationEnabled) {
+ val subExprs =
ctx.subexpressionEliminationForWholeStageCodegen(insertExprs)
+ val subExprsCode =
ctx.evaluateSubExprEliminationState(subExprs.states.values)
+ val vars = ctx.withSubExprEliminationExprs(subExprs.states) {
+ insertExprs.map(_.genCode(ctx))
+ }
+ val localInputVars = subExprs.exprCodesNeedEvaluate
+ (subExprsCode, vars, localInputVars)
+ } else {
+ ("", insertExprs.map(_.genCode(ctx)), Seq.empty)
+ }
+
+ val nonDeterministicInsertAttrs = insertOutput.zip(output)
+ .collect { case (expr, attr) if !expr.deterministic => attr }
+ val nonDeterministicInsertAttrSet =
AttributeSet(nonDeterministicInsertAttrs)
+
+ s"""
+ |// generate DELETE record
+ |${consume(ctx, deleteOutputVars)}
+ |// generate INSERT records
+ |${evaluateVariables(insertLocalInputVars)}
Review Comment:
wondering if we should split DELETE & insert record generation in separate
functions as it might potentially cause
`CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT` limit exceeded, expand exec, also
had to this at one point https://github.com/apache/spark/pull/32457, Thoughts
@aokolnychyi ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]