alexeykudinkin commented on code in PR #5272:
URL: https://github.com/apache/hudi/pull/5272#discussion_r849946852


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala:
##########
@@ -18,115 +18,83 @@
 package org.apache.spark.sql.hudi.command
 
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieSparkSqlWriter
+import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, 
CatalogTableType, HoodieCatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, 
HoodieCatalogTable}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getPartitionPathToDrop, 
normalizePartitionSpec}
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-
-import scala.util.control.NonFatal
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
 
 /**
  * Command for truncate hudi table.
  */
 case class TruncateHoodieTableCommand(
    tableIdentifier: TableIdentifier,
-   partitionSpec: Option[TablePartitionSpec])
-  extends HoodieLeafRunnableCommand {
+   specs: Option[TablePartitionSpec])
+  extends HoodieLeafRunnableCommand with ProvidesHoodieConfig {
 
-  override def run(spark: SparkSession): Seq[Row] = {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}"
     logInfo(s"start execute truncate table command for $fullTableName")
 
-    val hoodieCatalogTable = HoodieCatalogTable(spark, tableIdentifier)
-    val properties = hoodieCatalogTable.tableConfig.getProps
-
-    try {
-      // Delete all data in the table directory
-      val catalog = spark.sessionState.catalog
-      val table = catalog.getTableMetadata(tableIdentifier)
-      val tableIdentWithDB = table.identifier.quotedString
-
-      if (table.tableType == CatalogTableType.VIEW) {
-        throw new AnalysisException(
-          s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentWithDB")
-      }
-
-      if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
-        throw new AnalysisException(
-          s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not 
supported " +
-            s"for tables that are not partitioned: $tableIdentWithDB")
-      }
+    val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
 
-      val basePath = hoodieCatalogTable.tableLocation
-      val partCols = table.partitionColumnNames
-      val locations = if (partitionSpec.isEmpty || partCols.isEmpty) {
-        Seq(basePath)
-      } else {
-        val normalizedSpec: Seq[Map[String, String]] = Seq(partitionSpec.map { 
spec =>
-          normalizePartitionSpec(
-            spec,
-            partCols,
-            table.identifier.quotedString,
-            spark.sessionState.conf.resolver)
-        }.get)
+    val catalog = sparkSession.sessionState.catalog
+    val table = catalog.getTableMetadata(tableIdentifier)
+    val tableIdentWithDB = table.identifier.quotedString
 
-        val fullPartitionPath = FSUtils.getPartitionPath(basePath, 
getPartitionPathToDrop(hoodieCatalogTable, normalizedSpec))
-
-        Seq(fullPartitionPath)
-      }
+    if (table.tableType == CatalogTableType.VIEW) {
+      throw new AnalysisException(
+        s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentWithDB")
+    }
 
-      val hadoopConf = spark.sessionState.newHadoopConf()
-      locations.foreach { location =>
-        val path = new Path(location.toString)
-        try {
-          val fs = path.getFileSystem(hadoopConf)
-          fs.delete(path, true)
-          fs.mkdirs(path)
-        } catch {
-          case NonFatal(e) =>
-            throw new AnalysisException(
-              s"Failed to truncate table $tableIdentWithDB when removing data 
of the path: $path " +
-                s"because of ${e.toString}")
-        }
-      }
+    if (table.partitionColumnNames.isEmpty && specs.isDefined) {
+      throw new AnalysisException(
+        s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported 
" +
+          s"for tables that are not partitioned: $tableIdentWithDB")
+    }
 
-      // Also try to drop the contents of the table from the columnar cache
-      try {
-        
spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier), 
cascade = true)
-      } catch {
-        case NonFatal(_) =>
-      }
+    val basePath = hoodieCatalogTable.tableLocation
+    val properties = hoodieCatalogTable.tableConfig.getProps
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
 
-      if (table.stats.nonEmpty) {
-        // empty table after truncation
-        val newStats = CatalogStatistics(sizeInBytes = 0, rowCount = Some(0))
-        catalog.alterTableStats(tableIdentifier, Some(newStats))
-      }
-      Seq.empty[Row]
-    } catch {
-      // TruncateTableCommand will delete the related directories first, and 
then refresh the table.
-      // It will fail when refresh table, because the hudi meta 
directory(.hoodie) has been deleted at the first step.
-      // So here ignore this failure, and refresh table later.
-      case NonFatal(e) =>
-        throw new AnalysisException(s"Exception when attempting to truncate 
table ${tableIdentifier.quotedString}: " + e)
-    }
+    if (specs.isEmpty) {
+      val targetPath = new Path(basePath)
+      val engineContext = new 
HoodieSparkEngineContext(sparkSession.sparkContext)
+      val fs = FSUtils.getFs(basePath, 
sparkSession.sparkContext.hadoopConfiguration)
+      FSUtils.deleteDir(engineContext, fs, targetPath, 
sparkSession.sparkContext.defaultParallelism)
 
-    // If we have not specified the partition, truncate will delete all the 
data in the table path

Review Comment:
   Let's keep the commentary to explain the semantic



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala:
##########
@@ -52,33 +52,6 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
     checkAnswer(s"show partitions $tableName")(Seq.empty: _*)
   }
 
-  test("Purge drop non-partitioned table") {

Review Comment:
   Instead of removing these tests shall we just force cleaner instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to