vikramahuja1001 commented on a change in pull request #3917: URL: https://github.com/apache/carbondata/pull/3917#discussion_r509135611
########## File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util.path; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.commons.io.FileUtils; + +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; + +import org.apache.log4j.Logger; + +public final class TrashUtil { + + /** + * Attribute for Carbon LOGGER + */ + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonUtil.class.getName()); + + private TrashUtil() { + + } + + public static void copyDataToTrashFolder(String carbonTablePath, String pathOfFileToCopy, + String suffixToAdd) throws IOException { + String trashFolderPath = carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME + CarbonCommonConstants.FILE_SEPARATOR + + suffixToAdd; + try { + if (new File(pathOfFileToCopy).exists()) { + if (!FileFactory.isFileExist(trashFolderPath)) { + LOGGER.info("Creating Trash folder at:" + trashFolderPath); + FileFactory.createDirectoryAndSetPermission(trashFolderPath, + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + } + FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), Review comment: using copy, because if anything crashes while moving files, cannot recover them. So, copying all the files of a segment and then deleting them after copying is success ########## File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ########## @@ -152,6 +123,41 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, } } + public static HashMap<CarbonFile, String> getStaleSegments(LoadMetadataDetails[] details, Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala ########## @@ -45,9 +45,11 @@ case class CarbonTruncateCommand(child: TruncateTableCommand) extends DataComman throw new MalformedCarbonCommandException( "Unsupported truncate table with specified partition") } + val optionList = List.empty[(String, String)] + CarbonCleanFilesCommand( databaseNameOp = Option(dbName), - tableName = Option(tableName), + tableName = Option(tableName), Option(optionList), Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala ########## @@ -45,9 +45,11 @@ case class CarbonTruncateCommand(child: TruncateTableCommand) extends DataComman throw new MalformedCarbonCommandException( "Unsupported truncate table with specified partition") } + val optionList = List.empty[(String, String)] Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ########## @@ -108,7 +108,7 @@ case class CarbonLoadDataCommand(databaseNameOp: Option[String], // Delete stale segment folders that are not in table status but are physically present in // the Fact folder LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName") - TableProcessingOperations.deletePartialLoadDataIfExist(table, false) + // TableProcessingOperations.deletePartialLoadDataIfExist(table, false) Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala ########## @@ -90,7 +90,7 @@ case class CarbonInsertIntoWithDf(databaseNameOp: Option[String], // Delete stale segment folders that are not in table status but are physically present in // the Fact folder LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName") - TableProcessingOperations.deletePartialLoadDataIfExist(table, false) + // TableProcessingOperations.deletePartialLoadDataIfExist(table, false) Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util.path; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.commons.io.FileUtils; + +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; + +import org.apache.log4j.Logger; + +public final class TrashUtil { + + /** + * Attribute for Carbon LOGGER + */ + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonUtil.class.getName()); + + private TrashUtil() { + + } + + public static void copyDataToTrashFolder(String carbonTablePath, String pathOfFileToCopy, + String suffixToAdd) throws IOException { + String trashFolderPath = carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME + CarbonCommonConstants.FILE_SEPARATOR + + suffixToAdd; + try { + if (new File(pathOfFileToCopy).exists()) { + if (!FileFactory.isFileExist(trashFolderPath)) { Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1136,16 +1169,29 @@ public static void deleteSegment(String tablePath, Segment segment, * If partition specs are null, then directly delete parent directory in locationMap. */ private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs, - Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath) { + Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath, + String segmentNo, SegmentStatus segmentStatus) + throws IOException { for (String indexOrMergeFile : indexOrMergeFiles) { if (null != partitionSpecs) { Path location = new Path(indexOrMergeFile); boolean exists = pathExistsInPartitionSpec(partitionSpecs, location); if (!exists) { + // move to trash + TrashUtil.copyDataToTrashFolder(tablePath, location.toString(), CarbonCommonConstants + .LOAD_FOLDER + segmentNo + CarbonCommonConstants.FILE_SEPARATOR + + location.toString().substring(tablePath.length() + 1, + location.toString().length()).split("/")[0]); Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1136,16 +1169,29 @@ public static void deleteSegment(String tablePath, Segment segment, * If partition specs are null, then directly delete parent directory in locationMap. */ private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs, - Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath) { + Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath, + String segmentNo, SegmentStatus segmentStatus) + throws IOException { for (String indexOrMergeFile : indexOrMergeFiles) { if (null != partitionSpecs) { Path location = new Path(indexOrMergeFile); boolean exists = pathExistsInPartitionSpec(partitionSpecs, location); if (!exists) { + // move to trash + TrashUtil.copyDataToTrashFolder(tablePath, location.toString(), CarbonCommonConstants + .LOAD_FOLDER + segmentNo + CarbonCommonConstants.FILE_SEPARATOR + + location.toString().substring(tablePath.length() + 1, + location.toString().length()).split("/")[0]); Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1106,23 +1107,55 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio */ public static void deleteSegment(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs, - SegmentUpdateStatusManager updateStatusManager) throws Exception { + SegmentUpdateStatusManager updateStatusManager, String tableName, String DatabaseName, + SegmentStatus segmentStatus, Boolean isPartitionTable) + throws Exception { SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName()); - List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true, - FileFactory.getConfiguration()); + List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, + true, FileFactory.getConfiguration()); Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap(); for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) { + // If the file to be deleted is a carbondata file, copy that file to the trash folder. Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1106,23 +1107,55 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio */ public static void deleteSegment(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs, - SegmentUpdateStatusManager updateStatusManager) throws Exception { + SegmentUpdateStatusManager updateStatusManager, String tableName, String DatabaseName, + SegmentStatus segmentStatus, Boolean isPartitionTable) + throws Exception { SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName()); - List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true, - FileFactory.getConfiguration()); + List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, + true, FileFactory.getConfiguration()); Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap(); for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) { + // If the file to be deleted is a carbondata file, copy that file to the trash folder. + if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS) { + if (!isPartitionTable) { + TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), CarbonCommonConstants + .LOAD_FOLDER + segment.getSegmentNo()); + } else { + TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), CarbonCommonConstants + .LOAD_FOLDER + segment.getSegmentNo() + CarbonCommonConstants.FILE_SEPARATOR + + entry.getKey().substring(tablePath.length() + 1, entry.getKey().length())); Review comment: changed the logic ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1106,23 +1107,55 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio */ public static void deleteSegment(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs, - SegmentUpdateStatusManager updateStatusManager) throws Exception { + SegmentUpdateStatusManager updateStatusManager, String tableName, String DatabaseName, + SegmentStatus segmentStatus, Boolean isPartitionTable) + throws Exception { SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName()); - List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true, - FileFactory.getConfiguration()); + List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, + true, FileFactory.getConfiguration()); Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap(); for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) { + // If the file to be deleted is a carbondata file, copy that file to the trash folder. + if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS) { + if (!isPartitionTable) { + TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), CarbonCommonConstants + .LOAD_FOLDER + segment.getSegmentNo()); + } else { + TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), CarbonCommonConstants + .LOAD_FOLDER + segment.getSegmentNo() + CarbonCommonConstants.FILE_SEPARATOR + + entry.getKey().substring(tablePath.length() + 1, entry.getKey().length())); + } + } FileFactory.deleteFile(entry.getKey()); for (String file : entry.getValue()) { String[] deltaFilePaths = updateStatusManager.getDeleteDeltaFilePath(file, segment.getSegmentNo()); for (String deltaFilePath : deltaFilePaths) { + // If the file to be deleted is a carbondata file, copy that file to the trash folder. Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1106,23 +1107,55 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio */ public static void deleteSegment(String tablePath, Segment segment, List<PartitionSpec> partitionSpecs, - SegmentUpdateStatusManager updateStatusManager) throws Exception { + SegmentUpdateStatusManager updateStatusManager, String tableName, String DatabaseName, + SegmentStatus segmentStatus, Boolean isPartitionTable) + throws Exception { SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName()); - List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true, - FileFactory.getConfiguration()); + List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, + true, FileFactory.getConfiguration()); Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap(); for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) { + // If the file to be deleted is a carbondata file, copy that file to the trash folder. + if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS) { + if (!isPartitionTable) { + TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), CarbonCommonConstants + .LOAD_FOLDER + segment.getSegmentNo()); + } else { + TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), CarbonCommonConstants + .LOAD_FOLDER + segment.getSegmentNo() + CarbonCommonConstants.FILE_SEPARATOR + + entry.getKey().substring(tablePath.length() + 1, entry.getKey().length())); + } + } FileFactory.deleteFile(entry.getKey()); for (String file : entry.getValue()) { String[] deltaFilePaths = updateStatusManager.getDeleteDeltaFilePath(file, segment.getSegmentNo()); for (String deltaFilePath : deltaFilePaths) { + // If the file to be deleted is a carbondata file, copy that file to the trash folder. + if (segmentStatus == SegmentStatus + .INSERT_IN_PROGRESS) { + TrashUtil.copyDataToTrashFolder(tablePath, deltaFilePath, deltaFilePath + .substring(tablePath.length() + 1, deltaFilePath.length())); + } FileFactory.deleteFile(deltaFilePath); } + // If the file to be deleted is a carbondata file, copy that file to the trash folder. + if (file.endsWith(CarbonCommonConstants.FACT_FILE_EXT) && segmentStatus == + SegmentStatus.INSERT_IN_PROGRESS) { Review comment: it will always hit as the carbondata file is the map ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1057,7 +1058,7 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio partitionSpecs, fileStore.getIndexFilesMap(), indexOrMergeFiles, - table.getTablePath()); + table.getTablePath(), segment.getLoadName(), segment.getSegmentStatus()); Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ########## @@ -1136,16 +1169,29 @@ public static void deleteSegment(String tablePath, Segment segment, * If partition specs are null, then directly delete parent directory in locationMap. */ private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs, - Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath) { + Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath, + String segmentNo, SegmentStatus segmentStatus) Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ########## @@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long agingTime)throws }); } } + + public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad, + AbsoluteTableIdentifier absoluteTableIdentifier) { Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ########## @@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long agingTime)throws }); } } + + public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad, + AbsoluteTableIdentifier absoluteTableIdentifier) { + ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK); + boolean canBeDeleted = false; Review comment: ok ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ########## @@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long agingTime)throws }); } } + + public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad, + AbsoluteTableIdentifier absoluteTableIdentifier) { + ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK); + boolean canBeDeleted = false; + try { + if (segmentLock.lockWithRetries(1, 5)) { Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ########## @@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long agingTime)throws }); } } + + public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad, + AbsoluteTableIdentifier absoluteTableIdentifier) { + ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK); + boolean canBeDeleted = false; + try { + if (segmentLock.lockWithRetries(1, 5)) { + LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName() + "It " + Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ########## @@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long agingTime)throws }); } } + + public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad, + AbsoluteTableIdentifier absoluteTableIdentifier) { + ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK); + boolean canBeDeleted = false; + try { + if (segmentLock.lockWithRetries(1, 5)) { + LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName() + "It " + + "can be deleted as load is not going on"); Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ########## @@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long agingTime)throws }); } } + + public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad, + AbsoluteTableIdentifier absoluteTableIdentifier) { + ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK); + boolean canBeDeleted = false; + try { + if (segmentLock.lockWithRetries(1, 5)) { + LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName() + "It " + + "can be deleted as load is not going on"); + canBeDeleted = true; + } else { + LOGGER.info("Info: Load in progress for segment" + oneLoad.getLoadName()); + canBeDeleted = false; + } + } finally { + segmentLock.unlock(); + LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " is released"); Review comment: done ########## File path: core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java ########## @@ -138,8 +143,17 @@ public boolean accept(CarbonFile file) { if (filesToBeDeleted.length == 0) { status = true; } else { - for (CarbonFile eachFile : filesToBeDeleted) { + // If the file to be deleted is a carbondata file, copy that file to the trash + // folder. + if (eachFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT) && + oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) { + TrashUtil.copyDataToTrashFolder(carbonTable.getTablePath(), + eachFile.getAbsolutePath(), eachFile.getAbsolutePath() + .substring(carbonTable.getTablePath().length() + 1, + eachFile.getAbsolutePath().length())); Review comment: changed logic ########## File path: core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java ########## @@ -138,8 +143,17 @@ public boolean accept(CarbonFile file) { if (filesToBeDeleted.length == 0) { status = true; } else { - for (CarbonFile eachFile : filesToBeDeleted) { + // If the file to be deleted is a carbondata file, copy that file to the trash + // folder. + if (eachFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT) && Review comment: added code for that ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org