This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 04b175620f [CARBONDATA-4338] Moving dropped partition data to trash
04b175620f is described below

commit 04b175620fedbb5555258a81081b2f691fa0ee5e
Author: Mahesh Raju Somalaraju <[email protected]>
AuthorDate: Mon Jun 6 16:52:28 2022 +0530

    [CARBONDATA-4338] Moving dropped partition data to trash
    
    Why is this PR needed?
    When drop partition operation is performed carbon data will
    modify only table status file and can not delete the actual
    partition folder which contains data and index files. As
    comply with hive behaviour carbon data also should delete
    the deleted partition folder in storage[hdfs/obs/etc..].
    Before deleting carbon data will keep copy in Trash folder.
    User can restore it by checking the partition name and time stamp.
    
    What changes were proposed in this PR?
    Moved the deleted partition folder files to trash folder
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4276
---
 .../core/constants/CarbonCommonConstants.java      |  10 ++
 .../carbondata/core/util/CleanFilesUtil.java       |   4 +-
 .../org/apache/carbondata/core/util/TrashUtil.java |  41 ++++++
 docs/configuration-parameters.md                   |   1 +
 docs/ddl-of-carbondata.md                          |   1 +
 docs/faq.md                                        |   7 +-
 .../CarbonAlterTableDropHivePartitionCommand.scala |  30 ++++-
 .../StandardPartitionTableDropTestCase.scala       | 146 ++++++++++++++++++++-
 8 files changed, 236 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 663f7d21cb..023137e815 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2886,5 +2886,15 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_ENABLE_MULTI_VERSION_TABLE_STATUS_DEFAULT 
= "false";
 
+  /**
+   * Enable this property to move the dropped partition data to trash on
+   * ALTER DROP PARTITION operation
+   * By default it is disabled if user want to move partition data to trash
+   * then enable this feature.
+   */
+  @CarbonProperty
+  public static final String CARBON_ENABLE_PARTITION_DATA_TRASH =
+      "carbon.enable.partitiondata.trash";
 
+  public static final String CARBON_ENABLE_PARTITION_DATA_TRASH_DEFAULT = 
"false";
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
index bfa9b949fc..e395b7a622 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
@@ -206,10 +206,12 @@ public class CleanFilesUtil {
   /**
    * This method will delete all the empty partition folders starting from the 
table path
    */
-  private static void deleteEmptyPartitionFoldersRecursively(CarbonFile 
tablePath) {
+  public static void deleteEmptyPartitionFoldersRecursively(CarbonFile 
tablePath) {
     CarbonFile[] listOfFiles = tablePath.listFiles();
     if (listOfFiles.length == 0) {
       tablePath.delete();
+      // if parent file folder also empty then delete that too.
+      deleteEmptyPartitionFoldersRecursively(tablePath.getParentFile());
     } else {
       for (CarbonFile file: listOfFiles) {
         if (file.isDirectory() && file.getName().contains("=")) {
diff --git a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
index 47d196b7d9..6e6131b541 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
@@ -268,4 +268,45 @@ public final class TrashUtil {
       timeStampSubFolder + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath
       .SEGMENT_PREFIX + segmentNumber;
   }
+
+  /**
+   * This will give the complete path of the trash folder with the timestamp 
and the partition name
+   *
+   * @param tablePath          absolute table path
+   * @param timeStampSubFolder the timestamp for the clean files operation
+   * @param partitionName      partition name for which files are moved to the 
trash folder
+   */
+  public static String getCompleteTrashFolderPathForPartition(String tablePath,
+      long timeStampSubFolder, String partitionName) {
+    return CarbonTablePath.getTrashFolderPath(tablePath) + 
CarbonCommonConstants.FILE_SEPARATOR
+        + timeStampSubFolder + CarbonCommonConstants.FILE_SEPARATOR
+        + partitionName;
+  }
+
+  /**
+   * The below method copies dropped partition files to the trash folder.
+   *
+   * @param filesToCopy              absolute path of the files to copy to the 
trash folder
+   * @param trashFolderWithTimestamp trashfolderpath with complete timestamp 
and segment number
+   */
+  public static void copyPartitionDataToTrash(String filesToCopy, String 
trashFolderWithTimestamp) {
+    try {
+      if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {
+        FileFactory.mkdirs(trashFolderWithTimestamp);
+      }
+      // check if file exists before copying
+      if (FileFactory.isFileExist(filesToCopy)) {
+        CarbonFile folder = FileFactory.getCarbonFile(filesToCopy);
+        CarbonFile[] dataFiles = folder.listFiles();
+        for (CarbonFile carbonFile : dataFiles) {
+          copyFileToTrashFolder(carbonFile.getAbsolutePath(), 
trashFolderWithTimestamp);
+        }
+      } else {
+        LOGGER.warn("Folder not copied to trash as partition folder does not 
exist");
+      }
+    } catch (IOException e) {
+      // If file is already moved or not found then continue with other files
+      LOGGER.warn("Unable to copy file to trash folder as file not found.", e);
+    }
+  }
 }
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 39064d4ae9..38b4f11233 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -58,6 +58,7 @@ This section provides the details of all the configurations 
required for the Car
 | carbon.lock.class | (none) | This specifies the implementation of 
ICarbonLock interface to be used for acquiring the locks in case of concurrent 
operations                                                                      
                                                                                
                                                                                
                                                                                
                          [...]
 | carbon.data.file.version | V3 | This specifies carbondata file format 
version. Carbondata file format has evolved with time from V1 to V3 in terms of 
metadata storage and IO level pruning capabilities. You can find more details 
[here](https://carbondata.apache.org/file-structure-of-carbondata.html#carbondata-file-format).
                                                                                
                                                                                
        [...]
 | spark.carbon.hive.schema.store | false | Carbondata currently supports 2 
different types of metastores for storing schemas. This property specifies if 
Hive metastore is to be used for storing and retrieving table schemas           
                                                                                
                                                                                
                                                                                
                    [...]
+| carbon.enable.partitiondata.trash | false | This property when enabled, will 
move the dropped partition data to trash on ALTER DROP PARTITION operation.
 
 ## Data Loading Configuration
 
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 3d04684c29..ab2781f579 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -1107,6 +1107,7 @@ Users can specify which columns to include and exclude 
for local dictionary gene
   ALTER TABLE locationTable DROP PARTITION (country = 'US');
   ```
 
+   **NOTE:** Enable 
[carbon.enable.partitiondata.trash](./configuration-parameters.md#system-configuration)
 to move dropped partition data to trash during alter table DROP PARTITION.
 #### Insert OVERWRITE
 
   This command allows you to insert or load overwrite on a specific partition.
diff --git a/docs/faq.md b/docs/faq.md
index cde3e9e4da..0e28abc448 100644
--- a/docs/faq.md
+++ b/docs/faq.md
@@ -31,6 +31,7 @@
 * [How to deal with the trailing task in 
query?](#How-to-deal-with-the-trailing-task-in-query)
 * [How to manage hybrid file format in carbondata 
table?](#How-to-manage-hybrid-file-format-in-carbondata-table)
 * [How to recover table status file if 
lost?](#How-to-recover-table-status-file-if-lost)
+* [Why deleted partition data still showing in file 
system?](#why-deleted-partition-data-still-showing-in-file-system)
 
 # TroubleShooting
 
@@ -481,4 +482,8 @@ TableStatusRecovery.main(args) --> args is of length two: 
1. Database Name 2. Ta
 TableStatus Recovery tool cannot recover table status version files for the 
below two scenarios
 1. After compaction, if table status file is lost, cannot recover compacted 
commit transaction, as the lost version file only has merged load details.
 2. After Delete segment by Id/Date, if table status file is lost, cannot 
recover deleted segment commit transaction, as the lost version file only has 
the segment status as deleted.
-3. Table status recovery on materialized view table is not supported.
\ No newline at end of file
+3. Table status recovery on materialized view table is not supported.
+
+## Why deleted partition data still showing in file system
+By default, the dropped partition data will not be physically removed from the 
table store until the table is dropped. 
+Enable carbon.enable.partitiondata.trash property to move all the dropped 
partitions data to trash during alter table DROP PARTITION operation itself.
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index d7d317f4f3..8399a3aa1f 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -31,13 +31,15 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.index.IndexStoreManager
-import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CleanFilesUtil, TrashUtil}
 import org.apache.carbondata.events._
 import org.apache.carbondata.spark.rdd.CarbonDropPartitionRDD
 
@@ -106,6 +108,32 @@ case class CarbonAlterTableDropHivePartitionCommand(
             ifExists,
             purge,
             retainData).run(sparkSession)
+          val isPartitionDataTrashEnabled = CarbonProperties.getInstance()
+            
.getProperty(CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH,
+              
CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH_DEFAULT).toBoolean
+          if (isPartitionDataTrashEnabled) {
+            // move  the partition files to trash folder which are dropped
+            val droppedPartitionNames = partitions.map { partition =>
+              partition.spec.map { specs => specs._1 + 
CarbonCommonConstants.EQUALS + specs._2 }
+            }
+            val timeStamp = System.currentTimeMillis()
+            droppedPartitionNames.zipWithIndex.foreach { partitionName =>
+              val droppedPartitionName = 
droppedPartitionNames(partitionName._2).mkString("/")
+              
TrashUtil.copyPartitionDataToTrash(carbonPartitionsTobeDropped.get(partitionName._2),
+                TrashUtil.getCompleteTrashFolderPathForPartition(
+                  table.getTablePath,
+                  timeStamp,
+                  droppedPartitionName))
+            }
+            // Delete partition folder after copy to trash
+            carbonPartitionsTobeDropped.asScala.foreach(delPartition => {
+              val partitionPath = FileFactory.getCarbonFile(delPartition)
+              CarbonUtil.deleteFoldersAndFiles(partitionPath)
+            })
+            // Finally delete empty partition folders.
+            CleanFilesUtil.deleteEmptyPartitionFoldersRecursively(FileFactory
+              .getCarbonFile(table.getTablePath))
+          }
         }
       } catch {
         case e: Exception =>
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
index 0148d02140..ab80826859 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
@@ -17,18 +17,22 @@
 
 package org.apache.carbondata.spark.testsuite.standardpartition
 
+import java.io.File
 import java.nio.file.{Files, LinkOption, Paths}
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class StandardPartitionTableDropTestCase extends QueryTest with 
BeforeAndAfterAll {
   // scalastyle:off lineLength
+  var count = 0
   override def beforeAll {
     dropTable
 
@@ -218,6 +222,146 @@ class StandardPartitionTableDropTestCase extends 
QueryTest with BeforeAndAfterAl
       Seq(Row(0)))
   }
 
+  test("dropping partition with moving data to trash") {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH, "true")
+    sql("drop table if exists dropPartition1")
+    sql(
+      """
+        | CREATE TABLE dropPartition1 (empno int, empname String, designation 
String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(40)))
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='Learning')""")
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(32)))
+    sql(s"""ALTER TABLE dropPartition1 DROP 
PARTITION(deptname='configManagement')""")
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(28)))
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='network')""")
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(16)))
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='protocol')""")
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(8)))
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='security')""")
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(0)))
+    val table = CarbonEnv.getCarbonTable(Option("default"), 
"dropPartition1")(sqlContext
+      .sparkSession)
+    val tablePath = table.getTablePath
+    val deptname = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=Learning")
+    }
+    assert(deptname.length == 0)
+    val configManagement = 
FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=configManagement")
+    }
+    assert(configManagement.length == 0)
+    val network = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=network")
+    }
+    assert(network.length == 0)
+    val protocol = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=protocol")
+    }
+    assert(protocol.length == 0)
+    val security = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=security")
+    }
+    assert(security.length == 0)
+    sql("drop table if exists dropPartition1")
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH,
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH_DEFAULT)
+  }
+
+  test("dropping partition with moving data to trash and count check") {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH,
+      "true")
+    sql("drop table if exists dropPartition2")
+    sql(
+      """
+        | CREATE TABLE dropPartition2 (empno int, empname String, designation 
String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
dropPartition2 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
dropPartition2 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    val table = CarbonEnv.getCarbonTable(Option("default"), 
"dropPartition2")(sqlContext
+      .sparkSession)
+    val tablePath = table.getTablePath
+
+    // check partition folder before dropping the partition
+    var deptname = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=Learning")
+    }
+    assert(deptname.length > 0)
+    var configManagement = 
FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=configManagement")
+    }
+    assert(configManagement.length > 0)
+    // check the partitin folder after dropping the partition
+    sql(s"""ALTER TABLE dropPartition2 DROP PARTITION(deptname='Learning')""")
+    sql(s"""ALTER TABLE dropPartition2 DROP 
PARTITION(deptname='configManagement')""")
+
+    deptname = FileFactory.getCarbonFile(tablePath).listFiles().filter {
+      file => file.getName.equalsIgnoreCase("deptname=Learning")
+    }
+    assert(deptname.length == 0)
+    configManagement = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=configManagement")
+    }
+    assert(configManagement.length == 0)
+    // check the file count at trash folder
+    val trashFolderPath = tablePath + CarbonCommonConstants.FILE_SEPARATOR +
+                          CarbonTablePath.TRASH_DIR
+    assert(FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    val list = getFileCountInTrashFolder(trashFolderPath)
+    // carbondata files are added to the trash
+    assert(list > 0)
+    sql("drop table if exists dropPartition2")
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH,
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH_DEFAULT)
+  }
+
+  def getFileCountInTrashFolder(dirPath: String) : Int = {
+    val fileName = new File(dirPath)
+    val files = fileName.listFiles()
+    if (files != null) {
+      files.foreach(file => {
+        if (file.isFile) {
+          count = count + 1
+        }
+        if (file.isDirectory()) {
+          getFileCountInTrashFolder(file.getAbsolutePath())
+        }
+      })
+    }
+    count
+  }
+
   test("test dropping on partition table for int partition column") {
     sql(
       """

Reply via email to