[CARBONDATA-2049] CarbonCleanFilesCommand table path problem Problem: In CarbonCleanFilesCommand datbaseLocation is being passed instead of the tablePath in case of forceclean. And in case of cleanGarbageData, storeLocation is being passed instead of the tablePath.
This closes #1828 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9b479617 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9b479617 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9b479617 Branch: refs/heads/fgdatamap Commit: 9b4796177610e3a4f9d426169753a40eceb7b675 Parents: d509f17 Author: mohammadshahidkhan <[email protected]> Authored: Tue Jan 16 11:49:54 2018 +0530 Committer: Jacky Li <[email protected]> Committed: Wed Jan 31 09:37:39 2018 +0800 ---------------------------------------------------------------------- .../org/apache/carbondata/api/CarbonStore.scala | 32 ++++++++++++++------ .../management/CarbonCleanFilesCommand.scala | 25 +++++++-------- .../org/apache/spark/util/CleanFiles.scala | 32 ++++++++++++++------ .../apache/spark/util/CarbonCommandSuite.scala | 3 +- 4 files changed, 59 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b479617/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index d514f77..c02ba0a 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -98,34 +98,46 @@ object CarbonStore { } } + /** + * The method deletes all data if forceTableCLean <true> and lean garbage segment + * (MARKED_FOR_DELETE state) if forceTableCLean <false> + * + * @param dbName : Database name + * @param tableName : Table name + * @param tablePath : Table path + * @param carbonTable : CarbonTable Object <null> in case of force clean + * @param forceTableClean : <true> for force clean it will delete all data + * <false> it will clean garbage segment (MARKED_FOR_DELETE state) + * @param currentTablePartitions : Hive Partitions details + */ def cleanFiles( dbName: String, tableName: String, - storePath: String, + tablePath: String, carbonTable: CarbonTable, forceTableClean: Boolean, currentTablePartitions: Option[Seq[String]] = None): Unit = { LOGGER.audit(s"The clean files request has been received for $dbName.$tableName") var carbonCleanFilesLock: ICarbonLock = null - var absoluteTableIdentifier: AbsoluteTableIdentifier = null - if (forceTableClean) { - absoluteTableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName) + val absoluteTableIdentifier = if (forceTableClean) { + AbsoluteTableIdentifier.from(tablePath, dbName, tableName) } else { - absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier + carbonTable.getAbsoluteTableIdentifier } try { val errorMsg = "Clean files request is failed for " + s"$dbName.$tableName" + ". Not able to acquire the clean files lock due to another clean files " + "operation is running in the background." - carbonCleanFilesLock = - CarbonLockUtil.getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) + // in case of force clean the lock is not required if (forceTableClean) { - val absIdent = AbsoluteTableIdentifier.from(storePath, dbName, tableName) FileFactory.deleteAllCarbonFilesOfDir( - FileFactory.getCarbonFile(absIdent.getTablePath, - FileFactory.getFileType(absIdent.getTablePath))) + FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath, + FileFactory.getFileType(absoluteTableIdentifier.getTablePath))) } else { + carbonCleanFilesLock = + CarbonLockUtil + .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) DataLoadingUtil.deleteLoadsAndUpdateMetadata( isForceDeletion = true, carbonTable) CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b479617/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala index 303c3ef..4b68bd0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.api.CarbonStore import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.spark.util.CommonUtil @@ -70,12 +70,13 @@ case class CarbonCleanFilesCommand( databaseNameOp: Option[String], tableName: String): Unit = { val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession) + val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName CarbonStore.cleanFiles( - dbName, - tableName, - databaseLocation, - null, - forceTableClean) + dbName = dbName, + tableName = tableName, + tablePath = tablePath, + carbonTable = null, // in case of delete all data carbonTable is not required. + forceTableClean = forceTableClean) } private def cleanGarbageData(sparkSession: SparkSession, @@ -90,12 +91,12 @@ case class CarbonCleanFilesCommand( None } CarbonStore.cleanFiles( - CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), - tableName, - CarbonProperties.getStorePath, - carbonTable, - forceTableClean, - partitions) + dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), + tableName = tableName, + tablePath = carbonTable.getTablePath, + carbonTable = carbonTable, + forceTableClean = forceTableClean, + currentTablePartitions = partitions) } // Clean garbage data in all tables in all databases http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b479617/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala index eba7dcd..d4d9a84 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala @@ -29,19 +29,30 @@ import org.apache.carbondata.api.CarbonStore object CleanFiles { /** - * Clean the stale segments from table - * @param spark - * @param dbName - * @param tableName - * @param storePath - * @param forceTableClean if true, it deletes the table and its contents with force.It does not + * The method deletes all data if forceTableCLean <true> and lean garbage segment + * (MARKED_FOR_DELETE state) if forceTableCLean <false> + * + * @param spark : Database name + * @param dbName : Table name + * @param tableName : Table path + * @param forceTableClean : if true, it deletes the table and its contents with force.It does not * drop table from hive metastore so should be very careful to use it. */ def cleanFiles(spark: SparkSession, dbName: String, tableName: String, - storePath: String, forceTableClean: Boolean = false): Unit = { + forceTableClean: Boolean = false): Unit = { TableAPIUtil.validateTableExists(spark, dbName, tableName) - val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark) - CarbonStore.cleanFiles(dbName, tableName, storePath, carbonTable, forceTableClean) + val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(spark) + val carbonTable = if (!forceTableClean) { + CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark) + } else { + null + } + CarbonStore.cleanFiles( + dbName = dbName, + tableName = tableName, + tablePath = tablePath, + carbonTable = carbonTable, + forceTableClean = forceTableClean) } def main(args: Array[String]): Unit = { @@ -60,6 +71,7 @@ object CleanFiles { val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName") CarbonEnv.getInstance(spark).carbonMetastore. checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) - cleanFiles(spark, dbName, tableName, storePath, forceTableClean) + + cleanFiles(spark, dbName, tableName, forceTableClean) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b479617/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala index e493179..8ff6cab 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala @@ -142,7 +142,8 @@ class CarbonCommandSuite extends Spark2QueryTest with BeforeAndAfterAll { dropTable(table) createAndLoadTestTable(table, "csv_table") CleanFiles.main(Array(s"${location}", table, "true")) - val tablePath = s"${location}${File.separator}default${File.separator}$table" + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", table) + val tablePath = carbonTable.getTablePath val f = new File(tablePath) assert(!f.exists())
