aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1203217632
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
copy(child = newChild)
}
+ override def usedInputs: AttributeSet = {
+ // only attributes used at least twice should be evaluated before this
plan,
+ // otherwise defer the evaluation until an attribute is actually used
+ val usedExprIds = insertOutput.flatMap(_.collect {
+ case attr: Attribute => attr.exprId
+ })
+ val usedMoreThanOnceExprIds = usedExprIds.groupBy(id =>
id).filter(_._2.size > 1).keySet
+ references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+ }
+
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
+ }
+
+ override protected def doProduce(ctx: CodegenContext): String = {
+ child.asInstanceOf[CodegenSupport].produce(ctx, this)
+ }
+
+ override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row:
ExprCode): String = {
+ // no need to perform sub expression elimination for the delete projection
as
+ // as it only outputs row ID and metadata attributes without any changes
+ val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+ val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+ val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+ val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =
+ if (conf.subexpressionEliminationEnabled) {
+ val subExprs =
ctx.subexpressionEliminationForWholeStageCodegen(insertExprs)
+ val subExprsCode =
ctx.evaluateSubExprEliminationState(subExprs.states.values)
+ val vars = ctx.withSubExprEliminationExprs(subExprs.states) {
+ insertExprs.map(_.genCode(ctx))
+ }
+ val localInputVars = subExprs.exprCodesNeedEvaluate
+ (subExprsCode, vars, localInputVars)
+ } else {
+ ("", insertExprs.map(_.genCode(ctx)), Seq.empty)
+ }
+
+ val nonDeterministicInsertAttrs = insertOutput.zip(output)
Review Comment:
Unfortunately, Spark only allows non-deterministic expressions in a few
nodes and all custom nodes are prohibited. We will need to add this to Spark to
allow non-deterministic expressions in assignments. It is a similar problem in
MERGE operations. Until then, I can't add a test. That said, some vendors that
control Spark may be able to modify Spark so this logic would apply.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]