Indhumathi27 commented on code in PR #4276: URL: https://github.com/apache/carbondata/pull/4276#discussion_r907039904
########## integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala: ########## @@ -105,6 +109,28 @@ case class CarbonAlterTableDropHivePartitionCommand( ifExists, purge, retainData).run(sparkSession) + + // move the partition files to trash folder which are dropped + val droppedPartitionNames = partitions.map { partition => + partition.spec.map { specs => specs._1 + "=" + specs._2} + } + val droppedPartitionName = droppedPartitionNames + .head + .headOption + .get + TrashUtil.copyFilesToTrash(carbonPartitionsTobeDropped, + TrashUtil.getCompleteTrashFolderPath( Review Comment: `getCompleteTrashFolderPath` is used to create Path for Non-partition table(appends Segment Prefix), I think, for partition table, we should create Path with Partition folder name ########## integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala: ########## @@ -105,6 +109,28 @@ case class CarbonAlterTableDropHivePartitionCommand( ifExists, purge, retainData).run(sparkSession) + + // move the partition files to trash folder which are dropped + val droppedPartitionNames = partitions.map { partition => + partition.spec.map { specs => specs._1 + "=" + specs._2} + } + val droppedPartitionName = droppedPartitionNames + .head + .headOption + .get + TrashUtil.copyFilesToTrash(carbonPartitionsTobeDropped, + TrashUtil.getCompleteTrashFolderPath( + table.getTablePath, + CarbonUpdateUtil.readCurrentTime(), + droppedPartitionName)) + // Delete partition folder after copy to trash + carbonPartitionsTobeDropped.asScala.foreach(delPartition => { Review Comment: Please format the new code changes ########## integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala: ########## @@ -218,6 +222,129 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl Seq(Row(0))) } + test("dropping partition with moving data to trash") { + sql("drop table if exists dropPartition1") + sql( + """ + | CREATE TABLE dropPartition1 (empno int, empname String, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | PARTITIONED BY (deptname String,doj Timestamp,projectcode int) + | STORED AS carbondata + """.stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition1 OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition1 OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition1 OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition1 OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='Learning')""") + sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='configManagement')""") + sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='network')""") + sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='protocol')""") + sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='security')""") + val table = CarbonEnv.getCarbonTable(Option("default"), "dropPartition1")(sqlContext + .sparkSession) + val tablePath = table.getTablePath + val deptname = FileFactory.getCarbonFile(tablePath).listFiles().filter{ + file => file.getName.equalsIgnoreCase("deptname=Learning") + } + assert(deptname.length == 0) + val configManagement = FileFactory.getCarbonFile(tablePath).listFiles().filter{ + file => file.getName.equalsIgnoreCase("deptname=configManagement") + } + assert(configManagement.length == 0) + val network = FileFactory.getCarbonFile(tablePath).listFiles().filter{ + file => file.getName.equalsIgnoreCase("deptname=network") + } + assert(network.length == 0) + val protocol = FileFactory.getCarbonFile(tablePath).listFiles().filter{ + file => file.getName.equalsIgnoreCase("deptname=protocol") + } + assert(protocol.length == 0) + val security = FileFactory.getCarbonFile(tablePath).listFiles().filter{ + file => file.getName.equalsIgnoreCase("deptname=protocol") + } + assert(security.length == 0) + sql("drop table if exists dropPartition1") Review Comment: please add a validation to check the data count after drop partition ########## integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala: ########## @@ -30,13 +30,17 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants Review Comment: Please remove unused imports ########## integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala: ########## @@ -105,6 +109,28 @@ case class CarbonAlterTableDropHivePartitionCommand( ifExists, purge, retainData).run(sparkSession) + + // move the partition files to trash folder which are dropped + val droppedPartitionNames = partitions.map { partition => + partition.spec.map { specs => specs._1 + "=" + specs._2} + } + val droppedPartitionName = droppedPartitionNames Review Comment: Taking Head Option to create Partition folder name in Trash is wrong here. For example, consider a case, where user performs drop multi-level partititon (i.e., `DROP PARTITION(deptname='Learning',doj='2015-12-01 00:00:00' )`) in this case, Trash folder also should have multi-level partition folder, to roll back drop partition if user wants. So, in TRash, please copy under respective multi-level partition folders ########## integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala: ########## @@ -494,9 +494,8 @@ test("Creation of partition table should fail if the colname in table schema and sql(s"""ALTER TABLE staticpartitionlocloadother_new DROP PARTITION(empname='ravi')""") checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(10))) sql(s"""alter table staticpartitionlocloadother_new add partition (empname='ravi') location '$location'""") - checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(20))) sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") - checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(30))) + checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(20))) Review Comment: why this change ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org