This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b689a711ef9 [HUDI-9354] Support displaying complete dag for merge into 
statement in spark web ui (#13239)
b689a711ef9 is described below

commit b689a711ef9ac9dff9cf0c80b3945163a784f63d
Author: wangyinsheng <[email protected]>
AuthorDate: Thu May 1 08:44:13 2025 +0800

    [HUDI-9354] Support displaying complete dag for merge into statement in 
spark web ui (#13239)
    
    Co-authored-by: wangyinsheng <[email protected]>
---
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |  7 ++--
 .../hudi/command/MergeIntoHoodieTableCommand.scala | 38 +++++++++++-----------
 2 files changed, 24 insertions(+), 21 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index be7e7ceabfb..87296d177f8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -427,8 +427,11 @@ case class ResolveImplementations(sparkSession: 
SparkSession) extends Rule[Logic
     AnalysisHelper.allowInvokingTransformsInAnalyzer {
       plan match {
         // Convert to MergeIntoHoodieTableCommand
-        case mit@MatchMergeIntoTable(target@ResolvesToHudiTable(_), _, _) if 
mit.resolved =>
-          
MergeIntoHoodieTableCommand(ReplaceExpressions(mit).asInstanceOf[MergeIntoTable])
+        case mit@MatchMergeIntoTable(target@ResolvesToHudiTable(table), _, _) 
if mit.resolved =>
+          val catalogTable = HoodieCatalogTable(sparkSession, table)
+          val command = 
MergeIntoHoodieTableCommand(ReplaceExpressions(mit).asInstanceOf[MergeIntoTable],
 catalogTable, sparkSession, null)
+          val inputPlan = command.getProcessedInputPlan
+          command.copy(query = inputPlan)
 
         // Convert to UpdateHoodieTableCommand
         case ut@UpdateTable(plan@ResolvesToHudiTable(_), _, _) if ut.resolved 
=>
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 31f2d9cdbf6..06b452632cf 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -42,12 +42,13 @@ import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, BoundReference, EqualTo, Expression, Literal, 
NamedExpression, PredicateHelper}
 import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
-import org.apache.spark.sql.catalyst.plans.LeftOuter
+import org.apache.spark.sql.catalyst.plans.{LeftOuter, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, 
getPartitionPathFieldWriteConfig}
-import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
 import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand._
 import 
org.apache.spark.sql.hudi.command.PartialAssignmentMode.PartialAssignmentMode
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
@@ -105,12 +106,21 @@ class MergeIntoFieldTypeMismatchException(message: String)
  *
  * TODO explain workflow for MOR tables
  */
-case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends 
HoodieLeafRunnableCommand
+case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable,
+                                       hoodieCatalogTable: HoodieCatalogTable,
+                                       sparkSession: SparkSession,
+                                       query: LogicalPlan) extends 
DataWritingCommand
   with SparkAdapterSupport
   with ProvidesHoodieConfig
   with PredicateHelper {
 
-  private var sparkSession: SparkSession = _
+  override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+
+  override def outputColumnNames: Seq[String] = {
+    query.output.map(_.name)
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan = copy(query = newChild)
 
   /**
    * The target table schema without hoodie meta fields.
@@ -118,12 +128,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
   private lazy val targetTableSchema =
     removeMetaFields(mergeInto.targetTable.schema).fields
 
-  private lazy val hoodieCatalogTable = 
sparkAdapter.resolveHoodieTable(mergeInto.targetTable) match {
-    case Some(catalogTable) => HoodieCatalogTable(sparkSession, catalogTable)
-    case _ =>
-      failAnalysis(s"Failed to resolve MERGE INTO statement into the Hudi 
table. Got instead: ${mergeInto.targetTable}")
-  }
-
   private lazy val targetTableType = hoodieCatalogTable.tableTypeName
 
   /**
@@ -266,14 +270,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
         updatingActions.flatMap(_.assignments)).head
     }
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    this.sparkSession = sparkSession
+  override def run(sparkSession: SparkSession, inputPlan: SparkPlan): Seq[Row] 
= {
     // TODO move to analysis phase
     // Create the write parameters
     val props = buildMergeIntoConfig(hoodieCatalogTable)
     validate(props)
 
-    val processedInputDf: DataFrame = getProcessedInputDf
+    val processedInputDf: DataFrame = 
sparkSession.internalCreateDataFrame(inputPlan.execute(), inputPlan.schema)
     // Do the upsert
     executeUpsert(processedInputDf, props)
     // Refresh the table in the catalog
@@ -340,7 +343,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
    * <li>{@code ts = source.sts}</li>
    * </ul>
    */
-  private def getProcessedInputDf: DataFrame = {
+  def getProcessedInputPlan: LogicalPlan = {
     val resolver = sparkSession.sessionState.analyzer.resolver
 
     // For pkless table, we need to project the meta columns by joining with 
the target table;
@@ -394,10 +397,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
         case _ => attr
       }
     }
-
-    val amendedPlan = Project(adjustedSourceTableOutput ++ additionalColumns, 
inputPlan)
-
-    Dataset.ofRows(sparkSession, amendedPlan)
+    Project(adjustedSourceTableOutput ++ additionalColumns, inputPlan)
   }
 
   /**
@@ -725,7 +725,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // NOTE: We're relying on [[sourceDataset]] here instead of 
[[mergeInto.sourceTable]],
     //       as it could be amended to add missing primary-key and/or 
precombine columns.
     //       Please check [[sourceDataset]] scala-doc for more details
-    (getProcessedInputDf.queryExecution.analyzed.output ++ 
mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
+    (query.output ++ mergeInto.targetTable.output).filterNot(a => 
isMetaField(a.name))
   }
 
   private def validateInsertingAssignmentExpression(expr: Expression): Unit = {

Reply via email to