singhpk234 commented on code in PR #7691:
URL: https://github.com/apache/iceberg/pull/7691#discussion_r1204402182


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java:
##########
@@ -1455,6 +1457,31 @@ public void 
testUpdateToWapBranchWithTableBranchIdentifier() {
                         branch)));
   }
 
+  @Test
+  public void subExpressionEliminationInCodegen() {
+    createAndInitTable("id INT, c1 INT, c2 INT");
+
+    sql("INSERT INTO TABLE %s VALUES (1, 11, 111), (2, 22, 222)", tableName);
+    createBranchIfNeeded();
+
+    // disable AQE to see the final plan with codegen in EXPLAIN

Review Comment:
   can we do something like : 
   ```
      def getCodeAndCommentForUpdateRowExec(df: DataFrame): CodeAndComment = {
       val plan = df.queryExecution.executedPlan
       plan.execute()
       val updateRowExec = findTopLevelUpdateRowExec(plan)
       getCodeAndComment(updateRowExec.head)
     }
   
     def findTopLevelUpdateRowExec(plan: SparkPlan): Seq[UpdateRowExec] = {
       filterByType[UpdateRowExec](plan)
     }
   
     private def getCodeAndComment(plan: SparkPlan): CodeAndComment = {
       val codeGenSubTree = WholeStageCodegenExec(plan)(1)
       val codeAndComment = codeGenSubTree.doCodeGen()._2
       try {
         CodeGenerator.compile(codeAndComment)
       } catch {
         case e: Exception =>
           val msg =
             s"""
                |failed to compile:
                |Subtree:
                |$codeGenSubTree
                |Generated code:
                |${CodeFormatter.format(codeAndComment)}
               """.stripMargin
           fail(msg, e)
       }
       codeAndComment
     }
   ```
   
   This way we can test both AQE without AQE plans, thoughts @aokolnychyi  ?



##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this 
plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => 
id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {

Review Comment:
   should we also make needCopyResult true, considering this operator produces 
two row for a single row. 
   
   
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L348



##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/UpdateRowsExec.scala:
##########
@@ -51,6 +58,59 @@ case class UpdateRowsExec(
     copy(child = newChild)
   }
 
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this 
plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val usedExprIds = insertOutput.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = usedExprIds.groupBy(id => 
id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
+    // no need to perform sub expression elimination for the delete projection 
as
+    // as it only outputs row ID and metadata attributes without any changes
+    val deleteExprs = BindReferences.bindReferences(deleteOutput, child.output)
+    val deleteOutputVars = deleteExprs.map(_.genCode(ctx))
+
+    val insertExprs = BindReferences.bindReferences(insertOutput, child.output)
+    val (insertSubExprsCode, insertOutputVars, insertLocalInputVars) =
+      if (conf.subexpressionEliminationEnabled) {
+        val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(insertExprs)
+        val subExprsCode = 
ctx.evaluateSubExprEliminationState(subExprs.states.values)
+        val vars = ctx.withSubExprEliminationExprs(subExprs.states) {
+          insertExprs.map(_.genCode(ctx))
+        }
+        val localInputVars = subExprs.exprCodesNeedEvaluate
+        (subExprsCode, vars, localInputVars)
+      } else {
+        ("", insertExprs.map(_.genCode(ctx)), Seq.empty)
+      }
+
+    val nonDeterministicInsertAttrs = insertOutput.zip(output)
+      .collect { case (expr, attr) if !expr.deterministic => attr }
+    val nonDeterministicInsertAttrSet = 
AttributeSet(nonDeterministicInsertAttrs)
+
+    s"""
+       |// generate DELETE record
+       |${consume(ctx, deleteOutputVars)}
+       |// generate INSERT records
+       |${evaluateVariables(insertLocalInputVars)}

Review Comment:
   wondering if we should split DELETE & insert record generation in separate 
functions as it might potentially cause 
`CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT` limit exceeded, expand exec, also 
had to this at one point https://github.com/apache/spark/pull/32457, Thoughts 
@aokolnychyi ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to