danny0405 commented on code in PR #13176:
URL: https://github.com/apache/hudi/pull/13176#discussion_r2051629556
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala:
##########
@@ -17,30 +17,46 @@
package org.apache.spark.sql.hudi.command
+import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES,
SPARK_SQL_WRITES_PREPPED_KEY}
-import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter,
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter,
LogicalPlan, Project, UpdateTable}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import
org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
-case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends
HoodieLeafRunnableCommand
+case class DeleteHoodieTableCommand(catalogTable: HoodieCatalogTable, query:
LogicalPlan, config: Map[String, String]) extends DataWritingCommand
with SparkAdapterSupport
with ProvidesHoodieConfig {
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val catalogTable = sparkAdapter.resolveHoodieTable(dft.table)
- .map(HoodieCatalogTable(sparkSession, _))
- .get
+ override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
- val tableId = catalogTable.table.qualifiedName
+ override def outputColumnNames: Seq[String] = {
+ query.output.map(_.name)
+ }
+ override def run(sparkSession: SparkSession, queryPlan: SparkPlan): Seq[Row]
= {
+ val tableId = catalogTable.table.qualifiedName
logInfo(s"Executing 'DELETE FROM' command for $tableId")
+ val df = sparkSession.internalCreateDataFrame(queryPlan.execute(),
queryPlan.schema)
Review Comment:
Did you validate that this line does not cause performance regression?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]