This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7db96c71473 [HUDI-9324]Support displaying complete dag for delete 
statement in spark web ui (#13176)
7db96c71473 is described below

commit 7db96c714739de7adc5f38f38c90b4455d50b9ee
Author: wangyinsheng <[email protected]>
AuthorDate: Mon Apr 21 07:21:46 2025 +0800

    [HUDI-9324]Support displaying complete dag for delete statement in spark 
web ui (#13176)
    
    Co-authored-by: wangyinsheng <[email protected]>
---
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |  6 ++-
 .../hudi/command/DeleteHoodieTableCommand.scala    | 46 ++++++++++++----------
 2 files changed, 29 insertions(+), 23 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 7e75d139197..be7e7ceabfb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -437,8 +437,10 @@ case class ResolveImplementations(sparkSession: 
SparkSession) extends Rule[Logic
 
 
         // Convert to DeleteHoodieTableCommand
-        case dft@DeleteFromTable(plan@ResolvesToHudiTable(_), _) if 
dft.resolved =>
-          DeleteHoodieTableCommand(dft)
+        case dft@DeleteFromTable(ResolvesToHudiTable(table), _) if 
dft.resolved =>
+          val catalogTable = new HoodieCatalogTable(sparkSession, table)
+          val (plan, config) = 
DeleteHoodieTableCommand.inputPlan(sparkSession, dft, catalogTable)
+          DeleteHoodieTableCommand(catalogTable, plan, config)
 
         // Convert to CompactionHoodieTableCommand
         case ct @ CompactionTable(plan @ ResolvesToHudiTable(table), 
operation, options) if ct.resolved =>
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
index ecbd53753b3..998f50f37ab 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
@@ -17,30 +17,46 @@
 
 package org.apache.spark.sql.hudi.command
 
+import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
 import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES, 
SPARK_SQL_WRITES_PREPPED_KEY}
-import org.apache.hudi.SparkAdapterSupport
 import org.apache.hudi.common.table.HoodieTableConfig
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, 
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, 
LogicalPlan, Project, UpdateTable}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
 import 
org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
 
-case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends 
HoodieLeafRunnableCommand
+case class DeleteHoodieTableCommand(catalogTable: HoodieCatalogTable, query: 
LogicalPlan, config: Map[String, String]) extends DataWritingCommand
   with SparkAdapterSupport
   with ProvidesHoodieConfig {
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val catalogTable = sparkAdapter.resolveHoodieTable(dft.table)
-      .map(HoodieCatalogTable(sparkSession, _))
-      .get
+  override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
-    val tableId = catalogTable.table.qualifiedName
+  override def outputColumnNames: Seq[String] = {
+    query.output.map(_.name)
+  }
 
+  override def run(sparkSession: SparkSession, queryPlan: SparkPlan): Seq[Row] 
= {
+    val tableId = catalogTable.table.qualifiedName
     logInfo(s"Executing 'DELETE FROM' command for $tableId")
+    val df = sparkSession.internalCreateDataFrame(queryPlan.execute(), 
queryPlan.schema)
+    HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, 
config, df)
+    sparkSession.catalog.refreshTable(tableId)
+    logInfo(s"Finished executing 'DELETE FROM' command for $tableId")
+    Seq.empty[Row]
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan = copy(query = newChild)
+}
 
+object DeleteHoodieTableCommand extends SparkAdapterSupport with 
ProvidesHoodieConfig{
+
+  def inputPlan(sparkSession: SparkSession, dft: DeleteFromTable, 
catalogTable: HoodieCatalogTable): (LogicalPlan, Map[String, String]) = {
     val condition = sparkAdapter.extractDeleteCondition(dft)
 
     val config = if 
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
@@ -74,19 +90,7 @@ case class DeleteHoodieTableCommand(dft: DeleteFromTable) 
extends HoodieLeafRunn
     } else {
       targetLogicalPlan
     }
-
-    val df = Dataset.ofRows(sparkSession, filteredPlan)
-
-    df.write.format("hudi")
-      .mode(SaveMode.Append)
-      .options(config)
-      .save()
-
-    sparkSession.catalog.refreshTable(tableId)
-
-    logInfo(s"Finished executing 'DELETE FROM' command for $tableId")
-
-    Seq.empty[Row]
+    (filteredPlan, config)
   }
 
   def tryPruningDeleteRecordSchema(query: LogicalPlan, requiredColNames: 
Seq[String]): LogicalPlan = {

Reply via email to