wangyinsheng commented on code in PR #13176:
URL: https://github.com/apache/hudi/pull/13176#discussion_r2051729552


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala:
##########
@@ -17,30 +17,46 @@
 
 package org.apache.spark.sql.hudi.command
 
+import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
 import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES, 
SPARK_SQL_WRITES_PREPPED_KEY}
-import org.apache.hudi.SparkAdapterSupport
 import org.apache.hudi.common.table.HoodieTableConfig
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, 
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, 
LogicalPlan, Project, UpdateTable}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
 import 
org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
 
-case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends 
HoodieLeafRunnableCommand
+case class DeleteHoodieTableCommand(catalogTable: HoodieCatalogTable, query: 
LogicalPlan, config: Map[String, String]) extends DataWritingCommand
   with SparkAdapterSupport
   with ProvidesHoodieConfig {
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val catalogTable = sparkAdapter.resolveHoodieTable(dft.table)
-      .map(HoodieCatalogTable(sparkSession, _))
-      .get
+  override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
-    val tableId = catalogTable.table.qualifiedName
+  override def outputColumnNames: Seq[String] = {
+    query.output.map(_.name)
+  }
 
+  override def run(sparkSession: SparkSession, queryPlan: SparkPlan): Seq[Row] 
= {
+    val tableId = catalogTable.table.qualifiedName
     logInfo(s"Executing 'DELETE FROM' command for $tableId")
+    val df = sparkSession.internalCreateDataFrame(queryPlan.execute(), 
queryPlan.schema)

Review Comment:
   This is a performance comparison of three methods for creating a DataFrame.
   ```
   val benchmark = new HoodieBenchmark("DataFrame Compare", 1000000, 10)
   val data = createComplexDataFrame(1000000)
   data.createOrReplaceTempView("input_df")
   val df = spark.sql("select * from input_df where t1 > 1 ")
   val logicalPlan = df.logicalPlan
   val sparkPlan = df.queryExecution.sparkPlan
   
   Seq("Dataset.ofRows", "spark.internalCreateDataFrame", 
"spark.createDataFrame").foreach { convertType =>
     benchmark.addCase(convertType) { _ =>
       convertType match {
         case "Dataset.ofRows" =>
           val newDF = Dataset.ofRows(spark, logicalPlan)
           newDF.count()
         case "spark.internalCreateDataFrame" =>
           val newDF = spark.internalCreateDataFrame(sparkPlan.execute(), 
df.schema)
           newDF.count()
         case "spark.createDataFrame" =>
           val rowSerDe = sparkAdapter.createSparkRowSerDe(df.schema)
           val rows = sparkPlan.execute().map(rowSerDe.deserializeRow)
           val newDF = spark.createDataFrame(rows, df.schema)
           newDF.count()
       }
     }
   }
   benchmark.run()
   ```
   
   result:
   ```
   Java HotSpot(TM) 64-Bit Server VM 1.8.0_411-b09 on Mac OS X 15.3.1
   Apple M3 Pro
   pref bulk insert:                         Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   
------------------------------------------------------------------------------------------------------------------------
   Dataset.ofRows                                       20             25       
    5         49.8          20.1       1.0X
   spark.internalCreateDataFrame                        17             22       
    9         59.3          16.9       1.2X
   spark.createDataFrame                                20             31       
   12         50.1          20.0       1.0X
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to