xushiyan commented on code in PR #5272:
URL: https://github.com/apache/hudi/pull/5272#discussion_r849347351
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala:
##########
@@ -18,115 +18,69 @@
package org.apache.spark.sql.hudi.command
import org.apache.hadoop.fs.Path
+import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics,
CatalogTableType, HoodieCatalogTable}
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getPartitionPathToDrop,
normalizePartitionSpec}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTableType,
HoodieCatalogTable}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import scala.util.control.NonFatal
-
/**
* Command for truncate hudi table.
*/
case class TruncateHoodieTableCommand(
tableIdentifier: TableIdentifier,
- partitionSpec: Option[TablePartitionSpec])
+ specs: Option[TablePartitionSpec])
extends HoodieLeafRunnableCommand {
- override def run(spark: SparkSession): Seq[Row] = {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}"
logInfo(s"start execute truncate table command for $fullTableName")
- val hoodieCatalogTable = HoodieCatalogTable(spark, tableIdentifier)
- val properties = hoodieCatalogTable.tableConfig.getProps
-
- try {
- // Delete all data in the table directory
- val catalog = spark.sessionState.catalog
- val table = catalog.getTableMetadata(tableIdentifier)
- val tableIdentWithDB = table.identifier.quotedString
-
- if (table.tableType == CatalogTableType.VIEW) {
- throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentWithDB")
- }
+ val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
- if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
- throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not
supported " +
- s"for tables that are not partitioned: $tableIdentWithDB")
- }
+ val catalog = sparkSession.sessionState.catalog
+ val table = catalog.getTableMetadata(tableIdentifier)
+ val tableIdentWithDB = table.identifier.quotedString
- val basePath = hoodieCatalogTable.tableLocation
- val partCols = table.partitionColumnNames
- val locations = if (partitionSpec.isEmpty || partCols.isEmpty) {
- Seq(basePath)
- } else {
- val normalizedSpec: Seq[Map[String, String]] = Seq(partitionSpec.map {
spec =>
- normalizePartitionSpec(
- spec,
- partCols,
- table.identifier.quotedString,
- spark.sessionState.conf.resolver)
- }.get)
-
- val fullPartitionPath = FSUtils.getPartitionPath(basePath,
getPartitionPathToDrop(hoodieCatalogTable, normalizedSpec))
-
- Seq(fullPartitionPath)
- }
+ if (table.tableType == CatalogTableType.VIEW) {
+ throw new AnalysisException(
+ s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentWithDB")
+ }
- val hadoopConf = spark.sessionState.newHadoopConf()
- locations.foreach { location =>
- val path = new Path(location.toString)
- try {
- val fs = path.getFileSystem(hadoopConf)
- fs.delete(path, true)
- fs.mkdirs(path)
- } catch {
- case NonFatal(e) =>
- throw new AnalysisException(
- s"Failed to truncate table $tableIdentWithDB when removing data
of the path: $path " +
- s"because of ${e.toString}")
- }
- }
+ if (table.partitionColumnNames.isEmpty && specs.isDefined) {
+ throw new AnalysisException(
+ s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported
" +
+ s"for tables that are not partitioned: $tableIdentWithDB")
+ }
- // Also try to drop the contents of the table from the columnar cache
- try {
-
spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier),
cascade = true)
- } catch {
- case NonFatal(_) =>
- }
+ val basePath = hoodieCatalogTable.tableLocation
+ val properties = hoodieCatalogTable.tableConfig.getProps
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
- if (table.stats.nonEmpty) {
- // empty table after truncation
- val newStats = CatalogStatistics(sizeInBytes = 0, rowCount = Some(0))
- catalog.alterTableStats(tableIdentifier, Some(newStats))
- }
- Seq.empty[Row]
- } catch {
- // TruncateTableCommand will delete the related directories first, and
then refresh the table.
- // It will fail when refresh table, because the hudi meta
directory(.hoodie) has been deleted at the first step.
- // So here ignore this failure, and refresh table later.
- case NonFatal(e) =>
- throw new AnalysisException(s"Exception when attempting to truncate
table ${tableIdentifier.quotedString}: " + e)
- }
+ if (specs.isEmpty) {
+ val targetPath = new Path(basePath)
+ val engineContext = new
HoodieSparkEngineContext(sparkSession.sparkContext)
+ val fs = FSUtils.getFs(basePath,
sparkSession.sparkContext.hadoopConfiguration)
+ FSUtils.deleteDir(engineContext, fs, targetPath,
sparkSession.sparkContext.defaultParallelism)
- // If we have not specified the partition, truncate will delete all the
data in the table path
- // include the hoodie.properties. In this case we should reInit the table.
- if (partitionSpec.isEmpty) {
- val hadoopConf = spark.sessionState.newHadoopConf()
// ReInit hoodie.properties
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties)
.initTable(hadoopConf, hoodieCatalogTable.tableLocation)
- }
- // After deleting the data, refresh the table to make sure we don't keep
around a stale
- // file relation in the metastore cache and cached table data in the cache
manager.
-
spark.catalog.refreshTable(hoodieCatalogTable.table.identifier.quotedString)
- Seq.empty[Row]
+ // After deleting the data, refresh the table to make sure we don't keep
around a stale
+ // file relation in the metastore cache and cached table data in the
cache manager.
+ sparkSession.catalog.refreshTable(table.identifier.quotedString)
+ Seq.empty[Row]
+ } else {
+ AlterHoodieTableDropPartitionCommand(tableIdentifier,
+ specs = Seq(specs.get),
+ ifExists = false,
+ purge = false,
+ retainData = false)
+ .run(sparkSession)
Review Comment:
I suggest not to mix Drop with Truncate command implementation. e.g. when
you do logging, it's may log from Drop but users are actually running Truncate,
which is misleading. Also isolating the implementation makes the logic easier
to understand.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]