This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c86093f11e28 [SPARK-53146][CONNECT][SQL] Make MergeIntoTable in SparkConnectPlanner side effect free c86093f11e28 is described below commit c86093f11e28380c8d72f8855f4242e970a41de5 Author: Yihong He <heyihong...@gmail.com> AuthorDate: Mon Aug 18 16:13:51 2025 +0800 [SPARK-53146][CONNECT][SQL] Make MergeIntoTable in SparkConnectPlanner side effect free ### What changes were proposed in this pull request? This PR refactors the `MergeIntoTable` handling in `SparkConnectPlanner` to make it side-effect free by separating the transformation and execution phases. ### Why are the changes needed? Make MergeIntoTable side-effect free ### Does this PR introduce _any_ user-facing change? **No**. This is a purely internal refactoring. ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor 1.3.9 Closes #51876 from heyihong/SPARK-53146. Authored-by: Yihong He <heyihong...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/connect/planner/SparkConnectPlanner.scala | 12 ++++++------ .../scala/org/apache/spark/sql/classic/MergeIntoWriter.scala | 9 ++++++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 3913416c0dde..5f4530c0b6eb 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -2650,6 +2650,8 @@ class SparkConnectPlanner( Some(transformWriteOperation(command.getWriteOperation)) case proto.Command.CommandTypeCase.WRITE_OPERATION_V2 => Some(transformWriteOperationV2(command.getWriteOperationV2)) + case proto.Command.CommandTypeCase.MERGE_INTO_TABLE_COMMAND => + Some(transformMergeIntoTableCommand(command.getMergeIntoTableCommand)) case _ => None } @@ -2699,8 +2701,6 @@ class SparkConnectPlanner( handleCheckpointCommand(command.getCheckpointCommand, responseObserver) case proto.Command.CommandTypeCase.REMOVE_CACHED_REMOTE_RELATION_COMMAND => handleRemoveCachedRemoteRelationCommand(command.getRemoveCachedRemoteRelationCommand) - case proto.Command.CommandTypeCase.MERGE_INTO_TABLE_COMMAND => - handleMergeIntoTableCommand(command.getMergeIntoTableCommand) case proto.Command.CommandTypeCase.ML_COMMAND => handleMlCommand(command.getMlCommand, responseObserver) case proto.Command.CommandTypeCase.PIPELINE_COMMAND => @@ -3758,7 +3758,8 @@ class SparkConnectPlanner( executeHolder.eventsManager.postFinished() } - private def handleMergeIntoTableCommand(cmd: proto.MergeIntoTableCommand): Unit = { + private def transformMergeIntoTableCommand(cmd: proto.MergeIntoTableCommand)( + tracker: QueryPlanningTracker): LogicalPlan = { def transformActions(actions: java.util.List[proto.Expression]): Seq[MergeAction] = actions.asScala.map(transformExpression).map(_.asInstanceOf[MergeAction]).toSeq @@ -3766,7 +3767,7 @@ class SparkConnectPlanner( val notMatchedActions = transformActions(cmd.getNotMatchedActionsList) val notMatchedBySourceActions = transformActions(cmd.getNotMatchedBySourceActionsList) - val sourceDs = Dataset.ofRows(session, transformRelation(cmd.getSourceTablePlan)) + val sourceDs = Dataset.ofRows(session, transformRelation(cmd.getSourceTablePlan), tracker) val mergeInto = sourceDs .mergeInto(cmd.getTargetTableName, Column(transformExpression(cmd.getMergeCondition))) .asInstanceOf[MergeIntoWriter[Row]] @@ -3776,8 +3777,7 @@ class SparkConnectPlanner( if (cmd.getWithSchemaEvolution) { mergeInto.withSchemaEvolution() } - mergeInto.merge() - executeHolder.eventsManager.postFinished() + mergeInto.mergeCommand() } private val emptyLocalRelation = LocalRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/MergeIntoWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/MergeIntoWriter.scala index 0269b15061c9..e3c872658c86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/MergeIntoWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/MergeIntoWriter.scala @@ -57,13 +57,18 @@ class MergeIntoWriter[T] private[sql](table: String, ds: Dataset[T], on: Column) /** @inheritdoc */ def merge(): Unit = { + val qe = sparkSession.sessionState.executePlan(mergeCommand()) + qe.assertCommandExecuted() + } + + private[sql] def mergeCommand(): LogicalPlan = { if (matchedActions.isEmpty && notMatchedActions.isEmpty && notMatchedBySourceActions.isEmpty) { throw new SparkRuntimeException( errorClass = "NO_MERGE_ACTION_SPECIFIED", messageParameters = Map.empty) } - val merge = MergeIntoTable( + MergeIntoTable( UnresolvedRelation(tableName).requireWritePrivileges(MergeIntoTable.getWritePrivileges( matchedActions, notMatchedActions, notMatchedBySourceActions)), logicalPlan, @@ -72,8 +77,6 @@ class MergeIntoWriter[T] private[sql](table: String, ds: Dataset[T], on: Column) notMatchedActions.toSeq, notMatchedBySourceActions.toSeq, schemaEvolutionEnabled) - val qe = sparkSession.sessionState.executePlan(merge) - qe.assertCommandExecuted() } override protected[sql] def insertAll(condition: Option[Column]): this.type = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org