This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new ecebee5 [CARBONDATA-4092] Fix concurrent issues in delete segment
API's and MV flow
ecebee5 is described below
commit ecebee58eb0e0e330dd758b1469cf01ef06b154c
Author: Vikram Ahuja <[email protected]>
AuthorDate: Fri Dec 18 17:07:36 2020 +0530
[CARBONDATA-4092] Fix concurrent issues in delete segment API's and MV flow
Why is this PR needed?
They are multiple issues with the Delete segment API:
Not using the latest loadmetadatadetails while writing to table status
file, thus can remove table status entry of any concurrently loaded Insert In
progress/success segment.
The code reads the table status file 2 times
When in concurrent queries, they both access checkAndReloadSchema for MV on
all databases, 2 different queries try to create a file on same location, HDFS
takes the lock for one and fails for another, thus failing the query
What changes were proposed in this PR?
Only reading the table status file once.
Using the latest tablestatus to mark the segment Marked for delete, thus no
concurrent issues will come
Made touchMDT and checkAndReloadSchema methods syncronized, so that only
instance can access it at one time.
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #4059
---
.../core/statusmanager/SegmentStatusManager.java | 176 ++++++++++-----------
.../apache/carbondata/core/view/MVProvider.java | 12 +-
.../apache/carbondata/trash/DataTrashManager.scala | 10 ++
3 files changed, 105 insertions(+), 93 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index c062a02..4af4a55 100755
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -427,6 +427,8 @@ public class SegmentStatusManager {
public static List<String> updateDeletionStatus(AbsoluteTableIdentifier
identifier,
List<String> loadIds, String tableFolderPath) throws Exception {
CarbonTableIdentifier carbonTableIdentifier =
identifier.getCarbonTableIdentifier();
+ ICarbonLock carbonCleanFilesLock =
+ CarbonLockFactory.getCarbonLockObj(identifier,
LockUsage.CLEAN_FILES_LOCK);
ICarbonLock carbonDeleteSegmentLock =
CarbonLockFactory.getCarbonLockObj(identifier,
LockUsage.DELETE_SEGMENT_LOCK);
ICarbonLock carbonTableStatusLock =
@@ -437,46 +439,52 @@ public class SegmentStatusManager {
try {
if (carbonDeleteSegmentLock.lockWithRetries()) {
LOG.info("Delete segment lock has been successfully acquired");
+ if (carbonCleanFilesLock.lockWithRetries()) {
+ LOG.info("Clean Files lock has been successfully acquired");
+ String dataLoadLocation =
CarbonTablePath.getTableStatusFilePath(identifier
+ .getTablePath());
+ LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
+ if (!FileFactory.isFileExist(dataLoadLocation)) {
+ // log error.
+ LOG.error("Load metadata file is not present.");
+ return loadIds;
+ }
+ // read existing metadata details in load metadata.
+ listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
+ if (listOfLoadFolderDetailsArray.length != 0) {
+ updateDeletionStatus(identifier, loadIds,
listOfLoadFolderDetailsArray, invalidLoadIds);
+ if (invalidLoadIds.isEmpty()) {
+ // All or None , if anything fails then don't write
+ if (carbonTableStatusLock.lockWithRetries()) {
+ LOG.info("Table status lock has been successfully acquired");
+ // To handle concurrency scenarios, always take latest
metadata before writing
+ // into status file.
+ LoadMetadataDetails[] latestLoadMetadataDetails =
readLoadMetadata(tableFolderPath);
+ writeLoadDetailsIntoFile(dataLoadLocation,
updateLatestTableStatusDetails(
+ listOfLoadFolderDetailsArray,
latestLoadMetadataDetails).stream()
+ .toArray(LoadMetadataDetails[]::new));
+ } else {
+ String errorMsg = "Delete segment by id is failed for " +
tableDetails
+ + ". Not able to acquire the table status lock due to
other operation running "
+ + "in the background.";
+ LOG.error(errorMsg);
+ throw new Exception(errorMsg + " Please try after some time.");
+ }
- String dataLoadLocation =
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
- LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
- if (!FileFactory.isFileExist(dataLoadLocation)) {
- // log error.
- LOG.error("Load metadata file is not present.");
- return loadIds;
- }
- // read existing metadata details in load metadata.
- listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
- if (listOfLoadFolderDetailsArray.length != 0) {
- updateDeletionStatus(identifier, loadIds,
listOfLoadFolderDetailsArray, invalidLoadIds);
- if (invalidLoadIds.isEmpty()) {
- // All or None , if anything fails then don't write
- if (carbonTableStatusLock.lockWithRetries()) {
- LOG.info("Table status lock has been successfully acquired");
- // To handle concurrency scenarios, always take latest metadata
before writing
- // into status file.
- LoadMetadataDetails[] latestLoadMetadataDetails =
readLoadMetadata(tableFolderPath);
- updateLatestTableStatusDetails(listOfLoadFolderDetailsArray,
- latestLoadMetadataDetails);
- writeLoadDetailsIntoFile(dataLoadLocation,
listOfLoadFolderDetailsArray);
- }
- else {
- String errorMsg = "Delete segment by id is failed for " +
tableDetails
- + ". Not able to acquire the table status lock due to other
operation running "
- + "in the background.";
- LOG.error(errorMsg);
- throw new Exception(errorMsg + " Please try after some time.");
+ } else {
+ return invalidLoadIds;
}
} else {
- return invalidLoadIds;
+ LOG.error("Delete segment by Id is failed. No matching segment id
found.");
+ return loadIds;
}
-
} else {
- LOG.error("Delete segment by Id is failed. No matching segment id
found.");
- return loadIds;
+ String errorMsg = "Delete segment by id is failed for " +
tableDetails
+ + " as not able to acquire clean files lock.";
+ LOG.error(errorMsg);
+ throw new Exception(errorMsg + " Please try after some time.");
}
-
} else {
String errorMsg = "Delete segment by id is failed for " + tableDetails
+ ". Not able to acquire the delete segment lock due to another
delete "
@@ -490,6 +498,7 @@ public class SegmentStatusManager {
} finally {
CarbonLockUtil.fileUnlock(carbonTableStatusLock,
LockUsage.TABLE_STATUS_LOCK);
CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock,
LockUsage.DELETE_SEGMENT_LOCK);
+ CarbonLockUtil.fileUnlock(carbonCleanFilesLock,
LockUsage.CLEAN_FILES_LOCK);
}
return invalidLoadIds;
@@ -505,6 +514,8 @@ public class SegmentStatusManager {
public static List<String> updateDeletionStatus(AbsoluteTableIdentifier
identifier,
String loadDate, String tableFolderPath, Long loadStartTime) throws
Exception {
CarbonTableIdentifier carbonTableIdentifier =
identifier.getCarbonTableIdentifier();
+ ICarbonLock carbonCleanFilesLock =
+ CarbonLockFactory.getCarbonLockObj(identifier,
LockUsage.CLEAN_FILES_LOCK);
ICarbonLock carbonDeleteSegmentLock =
CarbonLockFactory.getCarbonLockObj(identifier,
LockUsage.DELETE_SEGMENT_LOCK);
ICarbonLock carbonTableStatusLock =
@@ -515,49 +526,55 @@ public class SegmentStatusManager {
try {
if (carbonDeleteSegmentLock.lockWithRetries()) {
LOG.info("Delete segment lock has been successfully acquired");
-
- String dataLoadLocation =
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
- LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
-
- if (!FileFactory.isFileExist(dataLoadLocation)) {
- // Table status file is not present, maybe table is empty, ignore
this operation
- LOG.warn("Trying to update table metadata file which is not
present.");
- return invalidLoadTimestamps;
- }
- // read existing metadata details in load metadata.
- listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
- if (listOfLoadFolderDetailsArray.length != 0) {
- updateDeletionStatus(identifier, loadDate,
listOfLoadFolderDetailsArray,
- invalidLoadTimestamps, loadStartTime);
- if (invalidLoadTimestamps.isEmpty()) {
- if (carbonTableStatusLock.lockWithRetries()) {
- LOG.info("Table status lock has been successfully acquired.");
- // To handle concurrency scenarios, always take latest metadata
before writing
- // into status file.
- LoadMetadataDetails[] latestLoadMetadataDetails =
readLoadMetadata(tableFolderPath);
- updateLatestTableStatusDetails(listOfLoadFolderDetailsArray,
- latestLoadMetadataDetails);
- writeLoadDetailsIntoFile(dataLoadLocation,
listOfLoadFolderDetailsArray);
+ if (carbonCleanFilesLock.lockWithRetries()) {
+ LOG.info("Clean Files lock has been successfully acquired");
+ String dataLoadLocation =
CarbonTablePath.getTableStatusFilePath(identifier
+ .getTablePath());
+ LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
+
+ if (!FileFactory.isFileExist(dataLoadLocation)) {
+ // Table status file is not present, maybe table is empty, ignore
this operation
+ LOG.warn("Trying to update table metadata file which is not
present.");
+ return invalidLoadTimestamps;
+ }
+ // read existing metadata details in load metadata.
+ listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
+ if (listOfLoadFolderDetailsArray.length != 0) {
+ updateDeletionStatus(identifier, loadDate,
listOfLoadFolderDetailsArray,
+ invalidLoadTimestamps, loadStartTime);
+ if (invalidLoadTimestamps.isEmpty()) {
+ if (carbonTableStatusLock.lockWithRetries()) {
+ LOG.info("Table status lock has been successfully acquired.");
+ // To handle concurrency scenarios, always take latest
metadata before writing
+ // into status file.
+ LoadMetadataDetails[] latestLoadMetadataDetails =
readLoadMetadata(tableFolderPath);
+ writeLoadDetailsIntoFile(dataLoadLocation,
updateLatestTableStatusDetails(
+ listOfLoadFolderDetailsArray,
latestLoadMetadataDetails).stream()
+ .toArray(LoadMetadataDetails[]::new));
+ } else {
+
+ String errorMsg = "Delete segment by date is failed for " +
tableDetails
+ + ". Not able to acquire the table status lock due to
other operation running "
+ + "in the background.";
+ LOG.error(errorMsg);
+ throw new Exception(errorMsg + " Please try after some time.");
+
+ }
+ } else {
+ return invalidLoadTimestamps;
}
- else {
-
- String errorMsg = "Delete segment by date is failed for " +
tableDetails
- + ". Not able to acquire the table status lock due to other
operation running "
- + "in the background.";
- LOG.error(errorMsg);
- throw new Exception(errorMsg + " Please try after some time.");
- }
} else {
+ LOG.error("Delete segment by date is failed. No matching segment
found.");
+ invalidLoadTimestamps.add(loadDate);
return invalidLoadTimestamps;
}
-
} else {
- LOG.error("Delete segment by date is failed. No matching segment
found.");
- invalidLoadTimestamps.add(loadDate);
- return invalidLoadTimestamps;
+ String errorMsg = "Delete segment by id is failed for " +
tableDetails
+ + " as not able to acquire clean files lock.";
+ LOG.error(errorMsg);
+ throw new Exception(errorMsg + " Please try after some time.");
}
-
} else {
String errorMsg = "Delete segment by date is failed for " +
tableDetails
+ ". Not able to acquire the delete segment lock due to another
delete "
@@ -571,6 +588,7 @@ public class SegmentStatusManager {
} finally {
CarbonLockUtil.fileUnlock(carbonTableStatusLock,
LockUsage.TABLE_STATUS_LOCK);
CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock,
LockUsage.DELETE_SEGMENT_LOCK);
+ CarbonLockUtil.fileUnlock(carbonCleanFilesLock,
LockUsage.CLEAN_FILES_LOCK);
}
return invalidLoadTimestamps;
@@ -819,26 +837,6 @@ public class SegmentStatusManager {
}
}
- /**
- * This API will return the update status file name.
- * @param segmentList
- * @return
- */
- public String getUpdateStatusFileName(LoadMetadataDetails[] segmentList) {
- if (segmentList.length == 0) {
- return "";
- }
- else {
- for (LoadMetadataDetails eachSeg : segmentList) {
- // file name stored in 0th segment.
- if (eachSeg.getLoadName().equalsIgnoreCase("0")) {
- return eachSeg.getUpdateStatusFileName();
- }
- }
- }
- return "";
- }
-
public static class ValidAndInvalidSegmentsInfo {
private final List<Segment> listOfValidSegments;
private final List<Segment> listOfValidUpdatedSegments;
diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
index e21127f..9aa5cf3 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
@@ -429,8 +429,9 @@ public class MVProvider {
}
this.schemas.add(viewSchema);
CarbonUtil.closeStreams(dataOutputStream, brWriter);
- checkAndReloadSchemas(viewManager, true);
- touchMDTFile();
+ if (!checkAndReloadSchemas(viewManager, true)) {
+ touchMDTFile();
+ }
}
}
@@ -523,7 +524,7 @@ public class MVProvider {
LOG.info(String.format("Materialized view %s schema is deleted",
viewName));
}
- private void checkAndReloadSchemas(MVManager viewManager, boolean
touchFile)
+ private synchronized boolean checkAndReloadSchemas(MVManager viewManager,
boolean touchFile)
throws IOException {
if (FileFactory.isFileExist(this.schemaIndexFilePath)) {
long lastModifiedTime =
@@ -531,16 +532,19 @@ public class MVProvider {
if (this.lastModifiedTime != lastModifiedTime) {
this.schemas = this.retrieveAllSchemasInternal(viewManager);
touchMDTFile();
+ return true;
}
} else {
this.schemas = this.retrieveAllSchemasInternal(viewManager);
if (touchFile) {
touchMDTFile();
+ return true;
}
}
+ return false;
}
- private void touchMDTFile() throws IOException {
+ private synchronized void touchMDTFile() throws IOException {
if (!FileFactory.isFileExist(this.systemDirectory)) {
FileFactory.createDirectoryAndSetPermission(this.systemDirectory,
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
b/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
index 75c4751..cadc43b 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
@@ -57,13 +57,20 @@ object DataTrashManager {
throw new RuntimeException("Clean files with force operation not
permitted by default")
}
var carbonCleanFilesLock: ICarbonLock = null
+ var carbonDeleteSegmentLock : ICarbonLock = null
try {
val errorMsg = "Clean files request is failed for " +
s"${ carbonTable.getQualifiedName }" +
". Not able to acquire the clean files lock due to another clean files
" +
"operation is running in the background."
+ val deleteSegmentErrorMsg = "Clean files request is failed for " +
+ s"${ carbonTable.getQualifiedName }" +
+ ". Not able to acquire the delete segment lock due to another delete
segment " +
+ "operation running in the background."
carbonCleanFilesLock =
CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
LockUsage.CLEAN_FILES_LOCK, errorMsg)
+ carbonDeleteSegmentLock = CarbonLockUtil.getLockObject(carbonTable
+ .getAbsoluteTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK,
deleteSegmentErrorMsg)
// step 1: check and clean trash folder
checkAndCleanTrashFolder(carbonTable, isForceDelete)
// step 2: move stale segments which are not exists in metadata into
.Trash
@@ -74,6 +81,9 @@ object DataTrashManager {
if (carbonCleanFilesLock != null) {
CarbonLockUtil.fileUnlock(carbonCleanFilesLock,
LockUsage.CLEAN_FILES_LOCK)
}
+ if (carbonDeleteSegmentLock != null) {
+ CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock,
LockUsage.DELETE_SEGMENT_LOCK)
+ }
}
}