This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c86093f11e28 [SPARK-53146][CONNECT][SQL] Make MergeIntoTable in 
SparkConnectPlanner side effect free
c86093f11e28 is described below

commit c86093f11e28380c8d72f8855f4242e970a41de5
Author: Yihong He <heyihong...@gmail.com>
AuthorDate: Mon Aug 18 16:13:51 2025 +0800

    [SPARK-53146][CONNECT][SQL] Make MergeIntoTable in SparkConnectPlanner side 
effect free
    
    ### What changes were proposed in this pull request?
    
    This PR refactors the `MergeIntoTable` handling in `SparkConnectPlanner` to 
make it side-effect free by separating the transformation and execution phases.
    
    ### Why are the changes needed?
    
    Make MergeIntoTable side-effect free
    
    ### Does this PR introduce _any_ user-facing change?
    
    **No**. This is a purely internal refactoring.
    
    ### How was this patch tested?
    
    Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Cursor 1.3.9
    
    Closes #51876 from heyihong/SPARK-53146.
    
    Authored-by: Yihong He <heyihong...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/connect/planner/SparkConnectPlanner.scala      | 12 ++++++------
 .../scala/org/apache/spark/sql/classic/MergeIntoWriter.scala |  9 ++++++---
 2 files changed, 12 insertions(+), 9 deletions(-)

diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 3913416c0dde..5f4530c0b6eb 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -2650,6 +2650,8 @@ class SparkConnectPlanner(
         Some(transformWriteOperation(command.getWriteOperation))
       case proto.Command.CommandTypeCase.WRITE_OPERATION_V2 =>
         Some(transformWriteOperationV2(command.getWriteOperationV2))
+      case proto.Command.CommandTypeCase.MERGE_INTO_TABLE_COMMAND =>
+        Some(transformMergeIntoTableCommand(command.getMergeIntoTableCommand))
       case _ =>
         None
     }
@@ -2699,8 +2701,6 @@ class SparkConnectPlanner(
         handleCheckpointCommand(command.getCheckpointCommand, responseObserver)
       case proto.Command.CommandTypeCase.REMOVE_CACHED_REMOTE_RELATION_COMMAND 
=>
         
handleRemoveCachedRemoteRelationCommand(command.getRemoveCachedRemoteRelationCommand)
-      case proto.Command.CommandTypeCase.MERGE_INTO_TABLE_COMMAND =>
-        handleMergeIntoTableCommand(command.getMergeIntoTableCommand)
       case proto.Command.CommandTypeCase.ML_COMMAND =>
         handleMlCommand(command.getMlCommand, responseObserver)
       case proto.Command.CommandTypeCase.PIPELINE_COMMAND =>
@@ -3758,7 +3758,8 @@ class SparkConnectPlanner(
     executeHolder.eventsManager.postFinished()
   }
 
-  private def handleMergeIntoTableCommand(cmd: proto.MergeIntoTableCommand): 
Unit = {
+  private def transformMergeIntoTableCommand(cmd: proto.MergeIntoTableCommand)(
+      tracker: QueryPlanningTracker): LogicalPlan = {
     def transformActions(actions: java.util.List[proto.Expression]): 
Seq[MergeAction] =
       
actions.asScala.map(transformExpression).map(_.asInstanceOf[MergeAction]).toSeq
 
@@ -3766,7 +3767,7 @@ class SparkConnectPlanner(
     val notMatchedActions = transformActions(cmd.getNotMatchedActionsList)
     val notMatchedBySourceActions = 
transformActions(cmd.getNotMatchedBySourceActionsList)
 
-    val sourceDs = Dataset.ofRows(session, 
transformRelation(cmd.getSourceTablePlan))
+    val sourceDs = Dataset.ofRows(session, 
transformRelation(cmd.getSourceTablePlan), tracker)
     val mergeInto = sourceDs
       .mergeInto(cmd.getTargetTableName, 
Column(transformExpression(cmd.getMergeCondition)))
       .asInstanceOf[MergeIntoWriter[Row]]
@@ -3776,8 +3777,7 @@ class SparkConnectPlanner(
     if (cmd.getWithSchemaEvolution) {
       mergeInto.withSchemaEvolution()
     }
-    mergeInto.merge()
-    executeHolder.eventsManager.postFinished()
+    mergeInto.mergeCommand()
   }
 
   private val emptyLocalRelation = LocalRelation(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/MergeIntoWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/MergeIntoWriter.scala
index 0269b15061c9..e3c872658c86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/MergeIntoWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/MergeIntoWriter.scala
@@ -57,13 +57,18 @@ class MergeIntoWriter[T] private[sql](table: String, ds: 
Dataset[T], on: Column)
 
   /** @inheritdoc */
   def merge(): Unit = {
+    val qe = sparkSession.sessionState.executePlan(mergeCommand())
+    qe.assertCommandExecuted()
+  }
+
+  private[sql] def mergeCommand(): LogicalPlan = {
     if (matchedActions.isEmpty && notMatchedActions.isEmpty && 
notMatchedBySourceActions.isEmpty) {
       throw new SparkRuntimeException(
         errorClass = "NO_MERGE_ACTION_SPECIFIED",
         messageParameters = Map.empty)
     }
 
-    val merge = MergeIntoTable(
+    MergeIntoTable(
       
UnresolvedRelation(tableName).requireWritePrivileges(MergeIntoTable.getWritePrivileges(
         matchedActions, notMatchedActions, notMatchedBySourceActions)),
       logicalPlan,
@@ -72,8 +77,6 @@ class MergeIntoWriter[T] private[sql](table: String, ds: 
Dataset[T], on: Column)
       notMatchedActions.toSeq,
       notMatchedBySourceActions.toSeq,
       schemaEvolutionEnabled)
-    val qe = sparkSession.sessionState.executePlan(merge)
-    qe.assertCommandExecuted()
   }
 
   override protected[sql] def insertAll(condition: Option[Column]): this.type 
= {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to