wangyinsheng commented on code in PR #13176:
URL: https://github.com/apache/hudi/pull/13176#discussion_r2051729552
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala:
##########
@@ -17,30 +17,46 @@
package org.apache.spark.sql.hudi.command
+import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES,
SPARK_SQL_WRITES_PREPPED_KEY}
-import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter,
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter,
LogicalPlan, Project, UpdateTable}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import
org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
-case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends
HoodieLeafRunnableCommand
+case class DeleteHoodieTableCommand(catalogTable: HoodieCatalogTable, query:
LogicalPlan, config: Map[String, String]) extends DataWritingCommand
with SparkAdapterSupport
with ProvidesHoodieConfig {
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val catalogTable = sparkAdapter.resolveHoodieTable(dft.table)
- .map(HoodieCatalogTable(sparkSession, _))
- .get
+ override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
- val tableId = catalogTable.table.qualifiedName
+ override def outputColumnNames: Seq[String] = {
+ query.output.map(_.name)
+ }
+ override def run(sparkSession: SparkSession, queryPlan: SparkPlan): Seq[Row]
= {
+ val tableId = catalogTable.table.qualifiedName
logInfo(s"Executing 'DELETE FROM' command for $tableId")
+ val df = sparkSession.internalCreateDataFrame(queryPlan.execute(),
queryPlan.schema)
Review Comment:
This is a performance comparison of three methods for creating a DataFrame.
```
val benchmark = new HoodieBenchmark("DataFrame Compare", 1000000, 10)
val data = createComplexDataFrame(1000000)
data.createOrReplaceTempView("input_df")
val df = spark.sql("select * from input_df where t1 > 1 ")
val logicalPlan = df.logicalPlan
val sparkPlan = df.queryExecution.sparkPlan
Seq("Dataset.ofRows", "spark.internalCreateDataFrame",
"spark.createDataFrame").foreach { convertType =>
benchmark.addCase(convertType) { _ =>
convertType match {
case "Dataset.ofRows" =>
val newDF = Dataset.ofRows(spark, logicalPlan)
newDF.count()
case "spark.internalCreateDataFrame" =>
val newDF = spark.internalCreateDataFrame(sparkPlan.execute(),
df.schema)
newDF.count()
case "spark.createDataFrame" =>
val rowSerDe = sparkAdapter.createSparkRowSerDe(df.schema)
val rows = sparkPlan.execute().map(rowSerDe.deserializeRow)
val newDF = spark.createDataFrame(rows, df.schema)
newDF.count()
}
}
}
benchmark.run()
```
result:
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_411-b09 on Mac OS X 15.3.1
Apple M3 Pro
pref bulk insert: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Dataset.ofRows 20 25
5 49.8 20.1 1.0X
spark.internalCreateDataFrame 17 22
9 59.3 16.9 1.2X
spark.createDataFrame 20 31
12 50.1 20.0 1.0X
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]