szehon-ho commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1203155869
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
copy(child = newChild)
}
+ override def usedInputs: AttributeSet = {
+ // only attributes used at least twice should be evaluated before this
plan,
+ // otherwise defer the evaluation until an attribute is actually used
+ val usedExprIds = insertOutput.flatMap(_.collect {
+ case attr: Attribute => attr.exprId
+ })
+ val usedMoreThanOnceExprIds = usedExprIds.groupBy(id =>
id).filter(_._2.size > 1).keySet
+ references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+ }
+
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
+ }
+
+ override protected def doProduce(ctx: CodegenContext): String = {
+ child.asInstanceOf[CodegenSupport].produce(ctx, this)
+ }
+
+ override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row:
ExprCode): String = {
+ // no need to perform sub expression elimination for the delete projection
as
+ // as it only outputs row ID and metadata attributes without any changes
+ val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+ val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+ val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+ val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =
+ if (conf.subexpressionEliminationEnabled) {
+ val subExprs =
ctx.subexpressionEliminationForWholeStageCodegen(insertExprs)
+ val subExprsCode =
ctx.evaluateSubExprEliminationState(subExprs.states.values)
+ val vars = ctx.withSubExprEliminationExprs(subExprs.states) {
+ insertExprs.map(_.genCode(ctx))
+ }
+ val localInputVars = subExprs.exprCodesNeedEvaluate
+ (subExprsCode, vars, localInputVars)
+ } else {
+ ("", insertExprs.map(_.genCode(ctx)), Seq.empty)
+ }
+
+ val nonDeterministicInsertAttrs = insertOutput.zip(output)
Review Comment:
Trying to follow this, is there any test that activates this part?
Walking through the new tests but didnt see this path activated, but maybe
its covered by existing ones.
And there's no concept of this in delete records, is it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]