aokolnychyi commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1202755142
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
copy(child = newChild)
}
+ override def usedInputs: AttributeSet = {
+ // only attributes used at least twice should be evaluated before this
plan,
+ // otherwise defer the evaluation until an attribute is actually used
+ val usedExprIds = insertOutput.flatMap(_.collect {
+ case attr: Attribute => attr.exprId
+ })
+ val usedMoreThanOnceExprIds = usedExprIds.groupBy(id =>
id).filter(_._2.size > 1).keySet
+ references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+ }
+
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
+ }
+
+ override protected def doProduce(ctx: CodegenContext): String = {
+ child.asInstanceOf[CodegenSupport].produce(ctx, this)
+ }
+
+ override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row:
ExprCode): String = {
+ // no need to perform sub expression elimination for the delete projection
as
+ // as it only outputs row ID and metadata attributes without any changes
+ val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+ val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+ val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+ val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =
Review Comment:
This block ensures the same sub expressions are only evaluated once.
For instance, I have `c1 = id - 10, c2 = id - 10`. This ensures `id - 10` is
evaluated once and matches the regular projection.
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
copy(child = newChild)
}
+ override def usedInputs: AttributeSet = {
+ // only attributes used at least twice should be evaluated before this
plan,
+ // otherwise defer the evaluation until an attribute is actually used
+ val usedExprIds = insertOutput.flatMap(_.collect {
+ case attr: Attribute => attr.exprId
+ })
+ val usedMoreThanOnceExprIds = usedExprIds.groupBy(id =>
id).filter(_._2.size > 1).keySet
+ references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+ }
+
+ override def inputRDDs(): Seq[RDD[InternalRow]] = {
+ child.asInstanceOf[CodegenSupport].inputRDDs()
+ }
+
+ override protected def doProduce(ctx: CodegenContext): String = {
+ child.asInstanceOf[CodegenSupport].produce(ctx, this)
+ }
+
+ override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row:
ExprCode): String = {
+ // no need to perform sub expression elimination for the delete projection
as
+ // as it only outputs row ID and metadata attributes without any changes
+ val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+ val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+ val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+ val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =
Review Comment:
This block ensures the same sub expressions are evaluated once.
For instance, I have `c1 = id - 10, c2 = id - 10`. This ensures `id - 10` is
evaluated once and matches the regular projection.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]