Indhumathi27 commented on code in PR #4276:
URL: https://github.com/apache/carbondata/pull/4276#discussion_r907039904


##########
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala:
##########
@@ -105,6 +109,28 @@ case class CarbonAlterTableDropHivePartitionCommand(
             ifExists,
             purge,
             retainData).run(sparkSession)
+
+          // move  the partition files to trash folder which are dropped
+          val droppedPartitionNames = partitions.map { partition =>
+            partition.spec.map { specs => specs._1 + "=" + specs._2}
+          }
+          val droppedPartitionName = droppedPartitionNames
+            .head
+            .headOption
+            .get
+          TrashUtil.copyFilesToTrash(carbonPartitionsTobeDropped,
+            TrashUtil.getCompleteTrashFolderPath(

Review Comment:
   `getCompleteTrashFolderPath` is used to create Path for Non-partition 
table(appends Segment Prefix), I think, for partition table, we should create 
Path with Partition folder name



##########
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala:
##########
@@ -105,6 +109,28 @@ case class CarbonAlterTableDropHivePartitionCommand(
             ifExists,
             purge,
             retainData).run(sparkSession)
+
+          // move  the partition files to trash folder which are dropped
+          val droppedPartitionNames = partitions.map { partition =>
+            partition.spec.map { specs => specs._1 + "=" + specs._2}
+          }
+          val droppedPartitionName = droppedPartitionNames
+            .head
+            .headOption
+            .get
+          TrashUtil.copyFilesToTrash(carbonPartitionsTobeDropped,
+            TrashUtil.getCompleteTrashFolderPath(
+              table.getTablePath,
+              CarbonUpdateUtil.readCurrentTime(),
+              droppedPartitionName))
+          // Delete partition folder after copy to trash
+          carbonPartitionsTobeDropped.asScala.foreach(delPartition => {

Review Comment:
   Please format the new code changes



##########
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala:
##########
@@ -218,6 +222,129 @@ class StandardPartitionTableDropTestCase extends 
QueryTest with BeforeAndAfterAl
       Seq(Row(0)))
   }
 
+  test("dropping partition with moving data to trash") {
+    sql("drop table if exists dropPartition1")
+    sql(
+      """
+        | CREATE TABLE dropPartition1 (empno int, empname String, designation 
String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='Learning')""")
+    sql(s"""ALTER TABLE dropPartition1 DROP 
PARTITION(deptname='configManagement')""")
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='network')""")
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='protocol')""")
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='security')""")
+    val table = CarbonEnv.getCarbonTable(Option("default"), 
"dropPartition1")(sqlContext
+      .sparkSession)
+    val tablePath = table.getTablePath
+    val deptname = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=Learning")
+    }
+    assert(deptname.length == 0)
+    val configManagement = 
FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=configManagement")
+    }
+    assert(configManagement.length == 0)
+    val network = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=network")
+    }
+    assert(network.length == 0)
+    val protocol = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=protocol")
+    }
+    assert(protocol.length == 0)
+    val security = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=protocol")
+    }
+    assert(security.length == 0)
+    sql("drop table if exists dropPartition1")

Review Comment:
   please add a validation to check the data count after drop partition



##########
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala:
##########
@@ -30,13 +30,17 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants

Review Comment:
   Please remove unused imports



##########
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala:
##########
@@ -105,6 +109,28 @@ case class CarbonAlterTableDropHivePartitionCommand(
             ifExists,
             purge,
             retainData).run(sparkSession)
+
+          // move  the partition files to trash folder which are dropped
+          val droppedPartitionNames = partitions.map { partition =>
+            partition.spec.map { specs => specs._1 + "=" + specs._2}
+          }
+          val droppedPartitionName = droppedPartitionNames

Review Comment:
   Taking Head Option to create Partition folder name in Trash is wrong here. 
For example, consider a case, where user performs drop multi-level partititon 
(i.e., `DROP PARTITION(deptname='Learning',doj='2015-12-01 00:00:00' )`) in 
this case, Trash folder also should have multi-level partition folder, to roll 
back drop partition if user wants. So, in TRash, please copy under respective 
multi-level partition folders



##########
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala:
##########
@@ -494,9 +494,8 @@ test("Creation of partition table should fail if the 
colname in table schema and
       sql(s"""ALTER TABLE staticpartitionlocloadother_new DROP 
PARTITION(empname='ravi')""")
       checkAnswer(sql(s"select count(deptname) from 
staticpartitionlocloadother_new"), Seq(Row(10)))
       sql(s"""alter table staticpartitionlocloadother_new add partition 
(empname='ravi') location '$location'""")
-      checkAnswer(sql(s"select count(deptname) from 
staticpartitionlocloadother_new"), Seq(Row(20)))
       sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
staticpartitionlocloadother_new partition(empname='ravi') OPTIONS('DELIMITER'= 
',', 'QUOTECHAR'= '"')""")
-      checkAnswer(sql(s"select count(deptname) from 
staticpartitionlocloadother_new"), Seq(Row(30)))
+      checkAnswer(sql(s"select count(deptname) from 
staticpartitionlocloadother_new"), Seq(Row(20)))

Review Comment:
   why this change ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to