danny0405 commented on code in PR #13176:
URL: https://github.com/apache/hudi/pull/13176#discussion_r2051629556


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala:
##########
@@ -17,30 +17,46 @@
 
 package org.apache.spark.sql.hudi.command
 
+import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
 import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES, 
SPARK_SQL_WRITES_PREPPED_KEY}
-import org.apache.hudi.SparkAdapterSupport
 import org.apache.hudi.common.table.HoodieTableConfig
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, 
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, 
LogicalPlan, Project, UpdateTable}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
 import 
org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
 
-case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends 
HoodieLeafRunnableCommand
+case class DeleteHoodieTableCommand(catalogTable: HoodieCatalogTable, query: 
LogicalPlan, config: Map[String, String]) extends DataWritingCommand
   with SparkAdapterSupport
   with ProvidesHoodieConfig {
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val catalogTable = sparkAdapter.resolveHoodieTable(dft.table)
-      .map(HoodieCatalogTable(sparkSession, _))
-      .get
+  override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
-    val tableId = catalogTable.table.qualifiedName
+  override def outputColumnNames: Seq[String] = {
+    query.output.map(_.name)
+  }
 
+  override def run(sparkSession: SparkSession, queryPlan: SparkPlan): Seq[Row] 
= {
+    val tableId = catalogTable.table.qualifiedName
     logInfo(s"Executing 'DELETE FROM' command for $tableId")
+    val df = sparkSession.internalCreateDataFrame(queryPlan.execute(), 
queryPlan.schema)

Review Comment:
   Did you validate that this line does not cause performance regression?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to