akashrn5 commented on code in PR #4276:
URL: https://github.com/apache/carbondata/pull/4276#discussion_r918022752


##########
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala:
##########
@@ -106,6 +108,32 @@ case class CarbonAlterTableDropHivePartitionCommand(
             ifExists,
             purge,
             retainData).run(sparkSession)
+          val isPartitionDataTrashEnabled = CarbonProperties.getInstance()
+            
.getProperty(CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH,
+              
CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH_DEFAULT).toBoolean
+          if (isPartitionDataTrashEnabled) {
+            // move  the partition files to trash folder which are dropped
+            val droppedPartitionNames = partitions.map { partition =>
+              partition.spec.map { specs => specs._1 + "=" + specs._2 }
+            }
+            val timeStamp = CarbonUpdateUtil.readCurrentTime();
+            for (index <- droppedPartitionNames.indices) {

Review Comment:
   instead of for loop, if index required, you can use `zipwithindex` function



##########
core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java:
##########
@@ -2886,5 +2886,14 @@ private CarbonCommonConstants() {
 
   public static final String CARBON_ENABLE_MULTI_VERSION_TABLE_STATUS_DEFAULT 
= "false";
 
+  /**
+   * This flag is set to enable the feature of moving the partition data to 
trash
+   * when dropped partition command is given.
+   * By default it is enabled if any performance degrade then user should 
disable this feature.
+   */
+  @CarbonProperty
+  public static final String CARBON_ENABLE_PARTITION_DATA_TRASH =

Review Comment:
   update in document file also, also add in FAQ, because here you are saying 
if impacted in performance, can disable.



##########
core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java:
##########
@@ -206,10 +206,12 @@ private static void getStaleSegmentFiles(CarbonTable 
carbonTable, List<String> s
   /**
    * This method will delete all the empty partition folders starting from the 
table path
    */
-  private static void deleteEmptyPartitionFoldersRecursively(CarbonFile 
tablePath) {
+  public static void deleteEmptyPartitionFoldersRecursively(CarbonFile 
tablePath) {
     CarbonFile[] listOfFiles = tablePath.listFiles();
     if (listOfFiles.length == 0) {
       tablePath.delete();
+      // if parent file folder also empty then delete that too.
+      deleteEmptyPartitionFoldersRecursively(tablePath.getParentFile());

Review Comment:
   this may delete the database also, right? why add this here?



##########
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala:
##########
@@ -106,6 +108,32 @@ case class CarbonAlterTableDropHivePartitionCommand(
             ifExists,
             purge,
             retainData).run(sparkSession)
+          val isPartitionDataTrashEnabled = CarbonProperties.getInstance()
+            
.getProperty(CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH,
+              
CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH_DEFAULT).toBoolean
+          if (isPartitionDataTrashEnabled) {
+            // move  the partition files to trash folder which are dropped
+            val droppedPartitionNames = partitions.map { partition =>
+              partition.spec.map { specs => specs._1 + "=" + specs._2 }

Review Comment:
   do not hard code, please use 
org.apache.carbondata.core.constants.CarbonCommonConstants#EQUALS



##########
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala:
##########
@@ -106,6 +108,32 @@ case class CarbonAlterTableDropHivePartitionCommand(
             ifExists,
             purge,
             retainData).run(sparkSession)
+          val isPartitionDataTrashEnabled = CarbonProperties.getInstance()
+            
.getProperty(CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH,
+              
CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH_DEFAULT).toBoolean
+          if (isPartitionDataTrashEnabled) {
+            // move  the partition files to trash folder which are dropped
+            val droppedPartitionNames = partitions.map { partition =>
+              partition.spec.map { specs => specs._1 + "=" + specs._2 }
+            }
+            val timeStamp = CarbonUpdateUtil.readCurrentTime();

Review Comment:
   please directly use System.CurrentMillis, no need to call method for one line



##########
core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java:
##########
@@ -268,4 +268,45 @@ public static String getCompleteTrashFolderPath(String 
tablePath, long timeStamp
       timeStampSubFolder + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath
       .SEGMENT_PREFIX + segmentNumber;
   }
+
+  /**
+   * This will give the complete path of the trash folder with the timestamp 
and the partition name
+   *
+   * @param tablePath          absolute table path
+   * @param timeStampSubFolder the timestamp for the clean files operation
+   * @param partitionName      partition name for which files are moved to the 
trash folder
+   */
+  public static String getCompleteTrashFolderPathForPartition(String tablePath,
+      long timeStampSubFolder, String partitionName) {
+    return CarbonTablePath.getTrashFolderPath(tablePath) + 
CarbonCommonConstants.FILE_SEPARATOR
+        + timeStampSubFolder + CarbonCommonConstants.FILE_SEPARATOR
+        + partitionName;
+  }
+
+  /**
+   * The below method copies dropped partition files to the trash folder.
+   *
+   * @param filesToCopy              absolute path of the files to copy to the 
trash folder
+   * @param trashFolderWithTimestamp trashfolderpath with complete timestamp 
and segment number
+   */
+  public static void copyFilesToTrash(String filesToCopy, String 
trashFolderWithTimestamp) {

Review Comment:
   why are different methods required for handling partition tables? the 
existing method does not handle it? if so change the method name specific to 
the partition table, so that no one reuses it by mistake



##########
core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java:
##########
@@ -268,4 +268,45 @@ public static String getCompleteTrashFolderPath(String 
tablePath, long timeStamp
       timeStampSubFolder + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonTablePath
       .SEGMENT_PREFIX + segmentNumber;
   }
+
+  /**
+   * This will give the complete path of the trash folder with the timestamp 
and the partition name
+   *
+   * @param tablePath          absolute table path
+   * @param timeStampSubFolder the timestamp for the clean files operation
+   * @param partitionName      partition name for which files are moved to the 
trash folder
+   */
+  public static String getCompleteTrashFolderPathForPartition(String tablePath,
+      long timeStampSubFolder, String partitionName) {
+    return CarbonTablePath.getTrashFolderPath(tablePath) + 
CarbonCommonConstants.FILE_SEPARATOR
+        + timeStampSubFolder + CarbonCommonConstants.FILE_SEPARATOR
+        + partitionName;
+  }
+
+  /**
+   * The below method copies dropped partition files to the trash folder.
+   *
+   * @param filesToCopy              absolute path of the files to copy to the 
trash folder
+   * @param trashFolderWithTimestamp trashfolderpath with complete timestamp 
and segment number
+   */
+  public static void copyFilesToTrash(String filesToCopy, String 
trashFolderWithTimestamp) {
+    try {
+      if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {
+        FileFactory.mkdirs(trashFolderWithTimestamp);
+      }
+      // check if file exists before copying
+      if (FileFactory.isFileExist(filesToCopy)) {
+        CarbonFile folder = FileFactory.getCarbonFile(filesToCopy);
+        CarbonFile[] dataFiles = folder.listFiles();
+        for (CarbonFile carbonFile : dataFiles) {
+          copyFileToTrashFolder(carbonFile.getAbsolutePath(), 
trashFolderWithTimestamp);
+        }
+      } else {
+        LOGGER.warn("Folder not copied to trash as partition folder does not 
exist");
+      }
+    } catch (IOException e) {
+      // If file is already moved or not found then continue with other files
+      LOGGER.warn("Unable to copy file to trash folder as file already moved 
or not found.", e);

Review Comment:
   better to identify and give proper error messages rather than keeping users 
in ambiguous state



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to