This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7db96c71473 [HUDI-9324]Support displaying complete dag for delete
statement in spark web ui (#13176)
7db96c71473 is described below
commit 7db96c714739de7adc5f38f38c90b4455d50b9ee
Author: wangyinsheng <[email protected]>
AuthorDate: Mon Apr 21 07:21:46 2025 +0800
[HUDI-9324]Support displaying complete dag for delete statement in spark
web ui (#13176)
Co-authored-by: wangyinsheng <[email protected]>
---
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 6 ++-
.../hudi/command/DeleteHoodieTableCommand.scala | 46 ++++++++++++----------
2 files changed, 29 insertions(+), 23 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 7e75d139197..be7e7ceabfb 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -437,8 +437,10 @@ case class ResolveImplementations(sparkSession:
SparkSession) extends Rule[Logic
// Convert to DeleteHoodieTableCommand
- case dft@DeleteFromTable(plan@ResolvesToHudiTable(_), _) if
dft.resolved =>
- DeleteHoodieTableCommand(dft)
+ case dft@DeleteFromTable(ResolvesToHudiTable(table), _) if
dft.resolved =>
+ val catalogTable = new HoodieCatalogTable(sparkSession, table)
+ val (plan, config) =
DeleteHoodieTableCommand.inputPlan(sparkSession, dft, catalogTable)
+ DeleteHoodieTableCommand(catalogTable, plan, config)
// Convert to CompactionHoodieTableCommand
case ct @ CompactionTable(plan @ ResolvesToHudiTable(table),
operation, options) if ct.resolved =>
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
index ecbd53753b3..998f50f37ab 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
@@ -17,30 +17,46 @@
package org.apache.spark.sql.hudi.command
+import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES,
SPARK_SQL_WRITES_PREPPED_KEY}
-import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter,
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter,
LogicalPlan, Project, UpdateTable}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import
org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
-case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends
HoodieLeafRunnableCommand
+case class DeleteHoodieTableCommand(catalogTable: HoodieCatalogTable, query:
LogicalPlan, config: Map[String, String]) extends DataWritingCommand
with SparkAdapterSupport
with ProvidesHoodieConfig {
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val catalogTable = sparkAdapter.resolveHoodieTable(dft.table)
- .map(HoodieCatalogTable(sparkSession, _))
- .get
+ override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
- val tableId = catalogTable.table.qualifiedName
+ override def outputColumnNames: Seq[String] = {
+ query.output.map(_.name)
+ }
+ override def run(sparkSession: SparkSession, queryPlan: SparkPlan): Seq[Row]
= {
+ val tableId = catalogTable.table.qualifiedName
logInfo(s"Executing 'DELETE FROM' command for $tableId")
+ val df = sparkSession.internalCreateDataFrame(queryPlan.execute(),
queryPlan.schema)
+ HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append,
config, df)
+ sparkSession.catalog.refreshTable(tableId)
+ logInfo(s"Finished executing 'DELETE FROM' command for $tableId")
+ Seq.empty[Row]
+ }
+
+ override protected def withNewChildInternal(newChild: LogicalPlan):
LogicalPlan = copy(query = newChild)
+}
+object DeleteHoodieTableCommand extends SparkAdapterSupport with
ProvidesHoodieConfig{
+
+ def inputPlan(sparkSession: SparkSession, dft: DeleteFromTable,
catalogTable: HoodieCatalogTable): (LogicalPlan, Map[String, String]) = {
val condition = sparkAdapter.extractDeleteCondition(dft)
val config = if
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
@@ -74,19 +90,7 @@ case class DeleteHoodieTableCommand(dft: DeleteFromTable)
extends HoodieLeafRunn
} else {
targetLogicalPlan
}
-
- val df = Dataset.ofRows(sparkSession, filteredPlan)
-
- df.write.format("hudi")
- .mode(SaveMode.Append)
- .options(config)
- .save()
-
- sparkSession.catalog.refreshTable(tableId)
-
- logInfo(s"Finished executing 'DELETE FROM' command for $tableId")
-
- Seq.empty[Row]
+ (filteredPlan, config)
}
def tryPruningDeleteRecordSchema(query: LogicalPlan, requiredColNames:
Seq[String]): LogicalPlan = {