Repository: carbondata Updated Branches: refs/heads/master 241b2657a -> 12ccf708f
[CARBONDATA-2061] Check for only valid IN_PROGRESS segments Problem: During operations like drop, delete segment, compaction, IUD there is a check for the IN_PROGRESS segments of a table. This check is simply checking the tblstatus file for IN_PROGRESS segments. Solution: The check should validate the IN_PROGRESS segments and decide on Valid and Invalid IN_PROGRESS segments. This closes #1844 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/12ccf708 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/12ccf708 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/12ccf708 Branch: refs/heads/master Commit: 12ccf708f7b10fd9c7667aa37dc04938e76f36c6 Parents: 241b265 Author: dhatchayani <[email protected]> Authored: Mon Jan 22 14:42:07 2018 +0530 Committer: kunal642 <[email protected]> Committed: Mon Jan 29 13:50:49 2018 +0530 ---------------------------------------------------------------------- .../statusmanager/SegmentStatusManager.java | 61 ++++++++++++++------ .../processing/util/CarbonLoaderUtil.java | 8 ++- 2 files changed, 50 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/12ccf708/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index e1fadcf..6af0304 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -292,7 +292,7 @@ public class SegmentStatusManager { // read existing metadata details in load metadata. listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath); if (listOfLoadFolderDetailsArray.length != 0) { - updateDeletionStatus(loadIds, listOfLoadFolderDetailsArray, invalidLoadIds); + updateDeletionStatus(identifier, loadIds, listOfLoadFolderDetailsArray, invalidLoadIds); if (invalidLoadIds.isEmpty()) { // All or None , if anything fails then dont write if (carbonTableStatusLock.lockWithRetries()) { @@ -376,8 +376,8 @@ public class SegmentStatusManager { // read existing metadata details in load metadata. listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath); if (listOfLoadFolderDetailsArray.length != 0) { - updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray, invalidLoadTimestamps, - loadStartTime); + updateDeletionStatus(identifier, loadDate, listOfLoadFolderDetailsArray, + invalidLoadTimestamps, loadStartTime); if (invalidLoadTimestamps.isEmpty()) { if (carbonTableStatusLock.lockWithRetries()) { LOG.info("Table status lock has been successfully acquired."); @@ -471,8 +471,9 @@ public class SegmentStatusManager { * @param invalidLoadIds * @return invalidLoadIds */ - private static List<String> updateDeletionStatus(List<String> loadIds, - LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadIds) { + private static List<String> updateDeletionStatus(AbsoluteTableIdentifier absoluteTableIdentifier, + List<String> loadIds, LoadMetadataDetails[] listOfLoadFolderDetailsArray, + List<String> invalidLoadIds) { SegmentStatus segmentStatus = null; for (String loadId : loadIds) { boolean loadFound = false; @@ -488,12 +489,14 @@ public class SegmentStatusManager { LOG.error("Cannot delete the Segment which is compacted. Segment is " + loadId); invalidLoadIds.add(loadId); return invalidLoadIds; - } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus) { + } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus + && checkIfValidLoadInProgress(absoluteTableIdentifier, loadId)) { // if the segment status is in progress then no need to delete that. LOG.error("Cannot delete the segment " + loadId + " which is load in progress"); invalidLoadIds.add(loadId); return invalidLoadIds; - } else if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segmentStatus) { + } else if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segmentStatus + && checkIfValidLoadInProgress(absoluteTableIdentifier, loadId)) { // if the segment status is overwrite in progress, then no need to delete that. LOG.error("Cannot delete the segment " + loadId + " which is load overwrite " + "in progress"); @@ -531,9 +534,9 @@ public class SegmentStatusManager { * @param invalidLoadTimestamps * @return invalidLoadTimestamps */ - public static List<String> updateDeletionStatus(String loadDate, - LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadTimestamps, - Long loadStartTime) { + public static List<String> updateDeletionStatus(AbsoluteTableIdentifier absoluteTableIdentifier, + String loadDate, LoadMetadataDetails[] listOfLoadFolderDetailsArray, + List<String> invalidLoadTimestamps, Long loadStartTime) { // For each load timestamp loop through data and if the // required load timestamp is found then mark // the metadata as deleted. @@ -550,14 +553,19 @@ public class SegmentStatusManager { } else if (SegmentStatus.STREAMING == segmentStatus) { LOG.info("Ignoring the segment : " + loadMetadata.getLoadName() + "as the segment is streaming in progress."); - } else if (SegmentStatus.MARKED_FOR_DELETE != segmentStatus && - SegmentStatus.INSERT_IN_PROGRESS != segmentStatus && - SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS != segmentStatus) { + } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus && checkIfValidLoadInProgress( + absoluteTableIdentifier, loadMetadata.getLoadName())) { + LOG.info("Ignoring the segment : " + loadMetadata.getLoadName() + + "as the segment is insert in progress."); + } else if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segmentStatus + && checkIfValidLoadInProgress(absoluteTableIdentifier, loadMetadata.getLoadName())) { + LOG.info("Ignoring the segment : " + loadMetadata.getLoadName() + + "as the segment is insert overwrite in progress."); + } else if (SegmentStatus.MARKED_FOR_DELETE != segmentStatus) { loadFound = true; updateSegmentMetadataDetails(loadMetadata); - LOG.info("Info: " + - loadStartTimeString + loadMetadata.getLoadStartTime() + - " Marked for Delete"); + LOG.info("Info: " + loadStartTimeString + loadMetadata.getLoadStartTime() + + " Marked for Delete"); } } } @@ -700,11 +708,30 @@ public class SegmentStatusManager { SegmentStatus segmentStatus = loaddetail.getSegmentStatus(); if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS || segmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) { - loadInProgress = true; + loadInProgress = + checkIfValidLoadInProgress(carbonTable.getAbsoluteTableIdentifier(), + loaddetail.getLoadName()); } } } return loadInProgress; } + /** + * This method will check for valid IN_PROGRESS segments. + * Tries to acquire a lock on the segment and decide on the stale segments + * @param absoluteTableIdentifier + * + */ + public static Boolean checkIfValidLoadInProgress(AbsoluteTableIdentifier absoluteTableIdentifier, + String loadId) { + ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, + CarbonTablePath.addSegmentPrefix(loadId) + LockUsage.LOCK); + try { + return !segmentLock.lockWithRetries(1, 0); + } finally { + segmentLock.unlock(); + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/12ccf708/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 1991017..fdc2cc3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -199,10 +199,14 @@ public final class CarbonLoaderUtil { // 2. If load or insert into operation is in progress and insert overwrite operation // is triggered for (LoadMetadataDetails entry : listOfLoadFolderDetails) { - if (entry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) { + if (entry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS + && segmentStatusManager.checkIfValidLoadInProgress( + absoluteTableIdentifier, entry.getLoadName())) { throw new RuntimeException("Already insert overwrite is in progress"); } else if (newMetaEntry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS - && entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) { + && entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS + && segmentStatusManager.checkIfValidLoadInProgress( + absoluteTableIdentifier, entry.getLoadName())) { throw new RuntimeException("Already insert into or load is in progress"); } }
