This is an automated email from the ASF dual-hosted git repository.
qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 3c4c05d [CARBONDATA-3900][CARBONDATA-3882][CARBONDATA-3881] Fix
multiple concurrent issues in table status lock and segment lock for SI and
maintable
3c4c05d is described below
commit 3c4c05d4f28321f571f6f05b013bf7d3507dc4bb
Author: ajantha-bhat <[email protected]>
AuthorDate: Fri Jun 26 19:59:59 2020 +0530
[CARBONDATA-3900][CARBONDATA-3882][CARBONDATA-3881] Fix multiple concurrent
issues in table status lock and segment lock for SI and maintable
Why is this PR needed?
[CARBONDATA-3900] Fix maintable load failure in concurrent load and
compaction scenario
Main table load flow segment lock is released before updating the table
status success.
So, concurrent operation was considering this segment as stale segment (as
segment lock is not present) and cleaning it. Leading to unable to get file
status exception.
[CARBONDATA-3882] Fix wrong lock and missing Table status lock in some SI
flows
In updateLoadMetadataWithMergeStatus, we want update SI table status, but
lock is acquired on main table
triggerCompaction, updateTableStatusForIndexTables -> table status write is
happening without lock
[CARBONDATA-3881] Fix concurrent main table compaction and SI load issue
Consider a scenario, where segmentX has loaded to main table but failed to
load to SI table. So, while loading another segmentY, we reload failed SI
segmentX. this time if the segmentX is compacted in main table and clean files
executed on it. SI load will fail and segment id will not be found in
segmentMap of SI and it throws exception.
What changes were proposed in this PR?
for [CARBONDATA-3900]
release segment lock after main table is updated.
for [CARBONDATA-3882]
In updateLoadMetadataWithMergeStatus, take a lock on SI table
triggerCompaction, updateTableStatusForIndexTables -> add a lock for table
status write
for [CARBONDATA-3881]
just before reloading the failed SI segment. check if it is valid segment
in main table.
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #3810
---
.../spark/rdd/CarbonDataRDDFactory.scala | 361 +++++++++++----------
.../apache/spark/sql/index/CarbonIndexUtil.scala | 12 +-
.../load/CarbonInternalLoaderUtil.java | 28 +-
.../spark/sql/secondaryindex/load/Compactor.scala | 2 -
.../secondaryindex/util/SecondaryIndexUtil.scala | 82 +++--
5 files changed, 256 insertions(+), 229 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index b049373..daa4ca2 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -482,88 +482,77 @@ object CarbonDataRDDFactory {
errorMessage = errorMsgLocal
LOGGER.info(errorMessage)
LOGGER.error(ex)
- } finally {
- segmentLock.unlock()
}
- // handle the status file updation for the update cmd.
- if (updateModel.isDefined && !updateModel.get.loadAsNewSegment) {
- if (loadStatus == SegmentStatus.LOAD_FAILURE) {
- CarbonScalaUtil.updateErrorInUpdateModel(updateModel.get,
executorMessage)
- return null
- } else if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
- updateModel.get.executorErrors.failureCauses ==
FailureCauses.BAD_RECORDS &&
- carbonLoadModel.getBadRecordsAction.split(",")(1) ==
LoggerAction.FAIL.name) {
- return null
- } else {
- // in success case handle updation of the table status file.
- // success case.
- val segmentDetails = new util.HashSet[Segment]()
- var resultSize = 0
- res.foreach { resultOfSeg =>
- resultSize = resultSize + resultOfSeg.size
- resultOfSeg.foreach { resultOfBlock =>
- segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName))
- }
- }
- var segmentMetaDataInfoMap =
scala.collection.mutable.Map.empty[String, SegmentMetaDataInfo]
- if (!segmentMetaDataAccumulator.isZero) {
- segmentMetaDataAccumulator.value.asScala.foreach(map => if
(map.nonEmpty) {
- segmentMetaDataInfoMap = segmentMetaDataInfoMap ++ map
- })
- }
- val segmentFiles = updateSegmentFiles(carbonTable,
- segmentDetails,
- updateModel.get,
- segmentMetaDataInfoMap.asJava)
-
- // this means that the update doesnt have any records to update so no
need to do table
- // status file updation.
- if (resultSize == 0) {
+ try {
+ // handle the status file updation for the update cmd.
+ if (updateModel.isDefined && !updateModel.get.loadAsNewSegment) {
+ if (loadStatus == SegmentStatus.LOAD_FAILURE) {
+ CarbonScalaUtil.updateErrorInUpdateModel(updateModel.get,
executorMessage)
return null
- }
- if (!CarbonUpdateUtil.updateTableMetadataStatus(
- segmentDetails,
- carbonTable,
- updateModel.get.updatedTimeStamp + "",
- true,
- new util.ArrayList[Segment](0),
- new util.ArrayList[Segment](segmentFiles), "")) {
- LOGGER.error("Data update failed due to failure in table status
updation.")
- updateModel.get.executorErrors.errorMsg = errorMessage
- updateModel.get.executorErrors.failureCauses = FailureCauses
- .STATUS_FILE_UPDATION_FAILURE
+ } else if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
+ updateModel.get.executorErrors.failureCauses ==
FailureCauses.BAD_RECORDS &&
+ carbonLoadModel.getBadRecordsAction.split(",")(1) ==
LoggerAction.FAIL.name) {
return null
+ } else {
+ // in success case handle updation of the table status file.
+ // success case.
+ val segmentDetails = new util.HashSet[Segment]()
+ var resultSize = 0
+ res.foreach { resultOfSeg =>
+ resultSize = resultSize + resultOfSeg.size
+ resultOfSeg.foreach { resultOfBlock =>
+ segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName))
+ }
+ }
+ var segmentMetaDataInfoMap = scala
+ .collection
+ .mutable
+ .Map
+ .empty[String, SegmentMetaDataInfo]
+ if (!segmentMetaDataAccumulator.isZero) {
+ segmentMetaDataAccumulator.value.asScala.foreach(map => if
(map.nonEmpty) {
+ segmentMetaDataInfoMap = segmentMetaDataInfoMap ++ map
+ })
+ }
+ val segmentFiles = updateSegmentFiles(carbonTable,
+ segmentDetails,
+ updateModel.get,
+ segmentMetaDataInfoMap.asJava)
+
+ // this means that the update doesnt have any records to update so
no need to do table
+ // status file updation.
+ if (resultSize == 0) {
+ return null
+ }
+ if (!CarbonUpdateUtil.updateTableMetadataStatus(
+ segmentDetails,
+ carbonTable,
+ updateModel.get.updatedTimeStamp + "",
+ true,
+ new util.ArrayList[Segment](0),
+ new util.ArrayList[Segment](segmentFiles), "")) {
+ LOGGER.error("Data update failed due to failure in table status
updation.")
+ updateModel.get.executorErrors.errorMsg = errorMessage
+ updateModel.get.executorErrors.failureCauses = FailureCauses
+ .STATUS_FILE_UPDATION_FAILURE
+ return null
+ }
+ // code to handle Pre-Priming cache for update command
+ if (!segmentFiles.isEmpty) {
+ val segmentsToPrePrime = segmentFiles
+ .asScala
+ .map(iterator => iterator.getSegmentNo)
+ .toSeq
+ DistributedRDDUtils
+ .triggerPrepriming(sqlContext.sparkSession, carbonTable,
segmentsToPrePrime,
+ operationContext, hadoopConf, segmentsToPrePrime.toList)
+ }
}
- // code to handle Pre-Priming cache for update command
- if (!segmentFiles.isEmpty) {
- val segmentsToPrePrime = segmentFiles.asScala.map(iterator =>
iterator.getSegmentNo).toSeq
- DistributedRDDUtils
- .triggerPrepriming(sqlContext.sparkSession, carbonTable,
segmentsToPrePrime,
- operationContext, hadoopConf, segmentsToPrePrime.toList)
- }
- }
- return null
- }
- val uniqueTableStatusId =
Option(operationContext.getProperty("uuid")).getOrElse("")
- .asInstanceOf[String]
- if (loadStatus == SegmentStatus.LOAD_FAILURE) {
- // update the load entry in table status file for changing the status to
marked for delete
- CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel,
uniqueTableStatusId)
- LOGGER.info("********starting clean up**********")
- if (carbonLoadModel.isCarbonTransactionalTable) {
- // delete segment is applicable for transactional table
- CarbonLoaderUtil.deleteSegment(carbonLoadModel,
carbonLoadModel.getSegmentId.toInt)
- clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
+ return null
}
- LOGGER.info("********clean up done**********")
- LOGGER.warn("Cannot write load metadata file as data load failed")
- throw new Exception(errorMessage)
- } else {
- // check if data load fails due to bad record and throw data load
failure due to
- // bad record exception
- if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
- status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
- carbonLoadModel.getBadRecordsAction.split(",")(1) ==
LoggerAction.FAIL.name) {
+ val uniqueTableStatusId =
Option(operationContext.getProperty("uuid")).getOrElse("")
+ .asInstanceOf[String]
+ if (loadStatus == SegmentStatus.LOAD_FAILURE) {
// update the load entry in table status file for changing the status
to marked for delete
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel,
uniqueTableStatusId)
LOGGER.info("********starting clean up**********")
@@ -573,114 +562,138 @@ object CarbonDataRDDFactory {
clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
}
LOGGER.info("********clean up done**********")
- throw new Exception(status(0)._2._2.errorMsg)
- }
- // as no record loaded in new segment, new segment should be deleted
- val newEntryLoadStatus =
- if (carbonLoadModel.isCarbonTransactionalTable &&
- !carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isMV &&
- !CarbonLoaderUtil.isValidSegment(carbonLoadModel,
carbonLoadModel.getSegmentId.toInt)) {
- LOGGER.warn("Cannot write load metadata file as there is no data to
load")
- SegmentStatus.MARKED_FOR_DELETE
- } else {
- loadStatus
+ LOGGER.warn("Cannot write load metadata file as data load failed")
+ throw new Exception(errorMessage)
+ } else {
+ // check if data load fails due to bad record and throw data load
failure due to
+ // bad record exception
+ if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
+ status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
+ carbonLoadModel.getBadRecordsAction.split(",")(1) ==
LoggerAction.FAIL.name) {
+ // update the load entry in table status file for changing the
status to marked for delete
+ CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel,
uniqueTableStatusId)
+ LOGGER.info("********starting clean up**********")
+ if (carbonLoadModel.isCarbonTransactionalTable) {
+ // delete segment is applicable for transactional table
+ CarbonLoaderUtil.deleteSegment(carbonLoadModel,
carbonLoadModel.getSegmentId.toInt)
+ clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
+ }
+ LOGGER.info("********clean up done**********")
+ throw new Exception(status(0)._2._2.errorMsg)
}
+ // as no record loaded in new segment, new segment should be deleted
+ val newEntryLoadStatus =
+ if (carbonLoadModel.isCarbonTransactionalTable &&
+ !carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isMV &&
+ !CarbonLoaderUtil.isValidSegment(carbonLoadModel,
+ carbonLoadModel.getSegmentId.toInt)) {
+ LOGGER.warn("Cannot write load metadata file as there is no data
to load")
+ SegmentStatus.MARKED_FOR_DELETE
+ } else {
+ loadStatus
+ }
- val segmentMetaDataInfo =
CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
- carbonLoadModel.getSegmentId,
- segmentMetaDataAccumulator)
- val segmentFileName =
- SegmentFileStore.writeSegmentFile(carbonTable,
carbonLoadModel.getSegmentId,
- String.valueOf(carbonLoadModel.getFactTimeStamp),
segmentMetaDataInfo)
- // clear segmentMetaDataAccumulator
- segmentMetaDataAccumulator.reset()
-
- SegmentFileStore.updateTableStatusFile(
- carbonTable,
- carbonLoadModel.getSegmentId,
- segmentFileName,
- carbonTable.getCarbonTableIdentifier.getTableId,
- new SegmentFileStore(carbonTable.getTablePath, segmentFileName))
+ val segmentMetaDataInfo =
CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
+ carbonLoadModel.getSegmentId,
+ segmentMetaDataAccumulator)
+ val segmentFileName =
+ SegmentFileStore.writeSegmentFile(carbonTable,
carbonLoadModel.getSegmentId,
+ String.valueOf(carbonLoadModel.getFactTimeStamp),
segmentMetaDataInfo)
+ // clear segmentMetaDataAccumulator
+ segmentMetaDataAccumulator.reset()
- operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
- carbonLoadModel.getSegmentId)
- val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
- new LoadTablePreStatusUpdateEvent(
- carbonTable.getCarbonTableIdentifier,
- carbonLoadModel)
-
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent,
operationContext)
- val (done, writtenSegment) =
- updateTableStatus(
- sqlContext.sparkSession,
- status,
- carbonLoadModel,
- newEntryLoadStatus,
- overwriteTable,
+ SegmentFileStore.updateTableStatusFile(
+ carbonTable,
+ carbonLoadModel.getSegmentId,
segmentFileName,
- updateModel,
- uniqueTableStatusId)
- val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
- new LoadTablePostStatusUpdateEvent(carbonLoadModel)
- val commitComplete = try {
+ carbonTable.getCarbonTableIdentifier.getTableId,
+ new SegmentFileStore(carbonTable.getTablePath, segmentFileName))
+
+ operationContext.setProperty(carbonTable.getTableUniqueName +
"_Segment",
+ carbonLoadModel.getSegmentId)
+ val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+ new LoadTablePreStatusUpdateEvent(
+ carbonTable.getCarbonTableIdentifier,
+ carbonLoadModel)
OperationListenerBus.getInstance()
- .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
- true
- } catch {
- case ex: Exception =>
- LOGGER.error("Problem while committing indexes", ex)
- false
- }
- if (!done || !commitComplete) {
- CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel,
uniqueTableStatusId)
- LOGGER.info("********starting clean up**********")
- if (carbonLoadModel.isCarbonTransactionalTable) {
- // delete segment is applicable for transactional table
- CarbonLoaderUtil.deleteSegment(carbonLoadModel,
carbonLoadModel.getSegmentId.toInt)
- // delete corresponding segment file from metadata
- val segmentFile =
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
- File.separator + segmentFileName
- FileFactory.deleteFile(segmentFile)
- clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
+ .fireEvent(loadTablePreStatusUpdateEvent, operationContext)
+ val (done, writtenSegment) =
+ updateTableStatus(
+ sqlContext.sparkSession,
+ status,
+ carbonLoadModel,
+ newEntryLoadStatus,
+ overwriteTable,
+ segmentFileName,
+ updateModel,
+ uniqueTableStatusId)
+ val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
+ new LoadTablePostStatusUpdateEvent(carbonLoadModel)
+ val commitComplete = try {
+ OperationListenerBus.getInstance()
+ .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
+ true
+ } catch {
+ case ex: Exception =>
+ LOGGER.error("Problem while committing indexes", ex)
+ false
+ }
+ if (!done || !commitComplete) {
+ CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel,
uniqueTableStatusId)
+ LOGGER.info("********starting clean up**********")
+ if (carbonLoadModel.isCarbonTransactionalTable) {
+ // delete segment is applicable for transactional table
+ CarbonLoaderUtil.deleteSegment(carbonLoadModel,
carbonLoadModel.getSegmentId.toInt)
+ // delete corresponding segment file from metadata
+ val segmentFile =
+
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
+ File.separator + segmentFileName
+ FileFactory.deleteFile(segmentFile)
+ clearIndexFiles(carbonTable, carbonLoadModel.getSegmentId)
+ }
+ LOGGER.info("********clean up done**********")
+ LOGGER.error("Data load failed due to failure in table status
updation.")
+ throw new Exception("Data load failed due to failure in table status
updation.")
+ }
+ if (SegmentStatus.LOAD_PARTIAL_SUCCESS == loadStatus) {
+ LOGGER.info("Data load is partially successful for " +
+ s"${ carbonLoadModel.getDatabaseName }.${
carbonLoadModel.getTableName }")
+ } else {
+ LOGGER.info("Data load is successful for " +
+ s"${ carbonLoadModel.getDatabaseName }.${
carbonLoadModel.getTableName }")
}
- LOGGER.info("********clean up done**********")
- LOGGER.error("Data load failed due to failure in table status
updation.")
- throw new Exception("Data load failed due to failure in table status
updation.")
- }
- if (SegmentStatus.LOAD_PARTIAL_SUCCESS == loadStatus) {
- LOGGER.info("Data load is partially successful for " +
- s"${ carbonLoadModel.getDatabaseName }.${
carbonLoadModel.getTableName }")
- } else {
- LOGGER.info("Data load is successful for " +
- s"${ carbonLoadModel.getDatabaseName }.${
carbonLoadModel.getTableName }")
- }
- // code to handle Pre-Priming cache for loading
+ // code to handle Pre-Priming cache for loading
- if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
- DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession,
carbonTable, Seq(),
- operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
- }
- try {
- // compaction handling
- if (carbonTable.isHivePartitionTable) {
- carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
+ if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+ DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession,
carbonTable, Seq(),
+ operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
}
- val compactedSegments = new util.ArrayList[String]()
- handleSegmentMerging(sqlContext,
- carbonLoadModel
- .getCopyWithPartition(carbonLoadModel.getCsvHeader,
carbonLoadModel.getCsvDelimiter),
- carbonTable,
- compactedSegments,
- operationContext)
- carbonLoadModel.setMergedSegmentIds(compactedSegments)
- writtenSegment
- } catch {
- case e: Exception =>
- LOGGER.error(
- "Auto-Compaction has failed. Ignoring this exception because the" +
- " load is passed.", e)
+ try {
+ // compaction handling
+ if (carbonTable.isHivePartitionTable) {
+ carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
+ }
+ val compactedSegments = new util.ArrayList[String]()
+ handleSegmentMerging(sqlContext,
+ carbonLoadModel
+ .getCopyWithPartition(carbonLoadModel.getCsvHeader,
carbonLoadModel.getCsvDelimiter),
+ carbonTable,
+ compactedSegments,
+ operationContext)
+ carbonLoadModel.setMergedSegmentIds(compactedSegments)
writtenSegment
+ } catch {
+ case e: Exception =>
+ LOGGER.error(
+ "Auto-Compaction has failed. Ignoring this exception because
the" +
+ " load is passed.", e)
+ writtenSegment
+ }
}
+ } finally {
+ // Release the segment lock, once table status is finally updated
+ segmentLock.unlock()
}
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index 61c25e9..86576fa 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -274,19 +274,21 @@ object CarbonIndexUtil {
if (isLoadToFailedSISegments && null != failedLoadMetaDataDetils) {
val metadata = CarbonInternalLoaderUtil
.getListOfValidSlices(SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath))
+ segmentIdToLoadStartTimeMapping = CarbonInternalLoaderUtil
+
.getSegmentToLoadStartTimeMapping(carbonLoadModel.getLoadMetadataDetails.asScala.toArray)
+ .asScala
failedLoadMetaDataDetils.asScala.foreach(loadMetaDetail => {
// check whether this segment is valid or invalid, if it is present in
the valid list
- // then don't consider it for reloading
- if (!metadata.contains(loadMetaDetail.getLoadName)) {
+ // then don't consider it for reloading.
+ // Also main table should have this as a valid segment for reloading.
+ if (!metadata.contains(loadMetaDetail.getLoadName) &&
+
segmentIdToLoadStartTimeMapping.contains(loadMetaDetail.getLoadName)) {
segmentsToReload.append(loadMetaDetail.getLoadName)
}
})
LOGGER.info(
s"SI segments to be reloaded for index table: ${
indexTable.getTableUniqueName} are: ${segmentsToReload}")
- segmentIdToLoadStartTimeMapping = CarbonInternalLoaderUtil
-
.getSegmentToLoadStartTimeMapping(carbonLoadModel.getLoadMetadataDetails.asScala.toArray)
- .asScala
} else {
segmentIdToLoadStartTimeMapping = scala.collection.mutable
.Map((carbonLoadModel.getSegmentId, carbonLoadModel.getFactTimeStamp))
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
index ccb1e2a..f6a8835 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
@@ -235,22 +235,20 @@ public class CarbonInternalLoaderUtil {
*
*/
public static boolean updateLoadMetadataWithMergeStatus(CarbonTable
indexCarbonTable,
- String[] loadsToMerge, String mergedLoadNumber, CarbonLoadModel
carbonLoadModel,
- Map<String, String> segmentToLoadStartTimeMap, long mergeLoadStartTime,
- SegmentStatus segmentStatus, long newLoadStartTime, List<String>
rebuiltSegments)
- throws IOException {
+ String[] loadsToMerge, String mergedLoadNumber, Map<String, String>
segmentToLoadStartTimeMap,
+ long mergeLoadStartTime, SegmentStatus segmentStatus, long
newLoadStartTime,
+ List<String> rebuiltSegments) throws IOException {
boolean tableStatusUpdationStatus = false;
List<String> loadMergeList = new ArrayList<>(Arrays.asList(loadsToMerge));
- AbsoluteTableIdentifier absoluteTableIdentifier =
-
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
- SegmentStatusManager segmentStatusManager = new
SegmentStatusManager(absoluteTableIdentifier);
+ SegmentStatusManager segmentStatusManager =
+ new
SegmentStatusManager(indexCarbonTable.getAbsoluteTableIdentifier());
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
try {
if (carbonLock.lockWithRetries()) {
- LOGGER.info("Acquired lock for the table " +
carbonLoadModel.getDatabaseName() + "."
- + carbonLoadModel.getTableName() + " for table status updation ");
+ LOGGER.info("Acquired lock for the table " +
indexCarbonTable.getDatabaseName() + "."
+ + indexCarbonTable.getTableName() + " for table status updation ");
LoadMetadataDetails[] loadDetails =
SegmentStatusManager.readLoadMetadata(indexCarbonTable.getMetadataPath());
@@ -305,17 +303,17 @@ public class CarbonInternalLoaderUtil {
tableStatusUpdationStatus = true;
} else {
LOGGER.error(
- "Could not able to obtain lock for table" +
carbonLoadModel.getDatabaseName() + "."
- + carbonLoadModel.getTableName() + "for table status
updation");
+ "Could not able to obtain lock for table" +
indexCarbonTable.getDatabaseName() + "."
+ + indexCarbonTable.getTableName() + "for table status
updation");
}
} finally {
if (carbonLock.unlock()) {
- LOGGER.info("Table unlocked successfully after table status updation"
+ carbonLoadModel
- .getDatabaseName() + "." + carbonLoadModel.getTableName());
+ LOGGER.info("Table unlocked successfully after table status updation"
+ indexCarbonTable
+ .getDatabaseName() + "." + indexCarbonTable.getTableName());
} else {
LOGGER.error(
- "Unable to unlock Table lock for table" +
carbonLoadModel.getDatabaseName() + "."
- + carbonLoadModel.getTableName() + " during table status
updation");
+ "Unable to unlock Table lock for table" +
indexCarbonTable.getDatabaseName() + "."
+ + indexCarbonTable.getTableName() + " during table status
updation");
}
}
return tableStatusUpdationStatus;
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
index e1f009d..39b3d94 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
@@ -85,7 +85,6 @@ object Compactor {
indexCarbonTable,
loadsToMerge,
validSegments.head,
- carbonLoadModel,
segmentToSegmentTimestampMap,
segmentIdToLoadStartTimeMapping(validSegments.head),
SegmentStatus.INSERT_IN_PROGRESS, 0L, List.empty.asJava)
@@ -119,7 +118,6 @@ object Compactor {
indexCarbonTable,
loadsToMerge,
validSegments.head,
- carbonLoadModel,
segmentToSegmentTimestampMap,
segmentIdToLoadStartTimeMapping(validSegments.head),
SegmentStatus.SUCCESS,
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 298be7a..498c739 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -220,24 +220,32 @@ object SecondaryIndexUtil {
segment
}
- val endTime = System.currentTimeMillis()
- val loadMetadataDetails = SegmentStatusManager
- .readLoadMetadata(indexCarbonTable.getMetadataPath)
- loadMetadataDetails.foreach(loadMetadataDetail => {
- if (rebuiltSegments.contains(loadMetadataDetail.getLoadName)) {
-
loadMetadataDetail.setLoadStartTime(carbonLoadModel.getFactTimeStamp)
- loadMetadataDetail.setLoadEndTime(endTime)
- CarbonLoaderUtil
- .addDataIndexSizeIntoMetaEntry(loadMetadataDetail,
- loadMetadataDetail.getLoadName,
- indexCarbonTable)
+ val statusLock =
+ new
SegmentStatusManager(indexCarbonTable.getAbsoluteTableIdentifier).getTableStatusLock
+ try {
+ if (statusLock.lockWithRetries()) {
+ val endTime = System.currentTimeMillis()
+ val loadMetadataDetails = SegmentStatusManager
+ .readLoadMetadata(indexCarbonTable.getMetadataPath)
+ loadMetadataDetails.foreach(loadMetadataDetail => {
+ if (rebuiltSegments.contains(loadMetadataDetail.getLoadName)) {
+
loadMetadataDetail.setLoadStartTime(carbonLoadModel.getFactTimeStamp)
+ loadMetadataDetail.setLoadEndTime(endTime)
+ CarbonLoaderUtil
+ .addDataIndexSizeIntoMetaEntry(loadMetadataDetail,
+ loadMetadataDetail.getLoadName,
+ indexCarbonTable)
+ }
+ })
+ SegmentStatusManager
+
.writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(tablePath),
+ loadMetadataDetails)
}
- })
-
- SegmentStatusManager
-
.writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(tablePath),
- loadMetadataDetails)
-
+ } finally {
+ if (statusLock != null) {
+ statusLock.unlock()
+ }
+ }
// clear the indexSchema cache for the merged segments, as the index
files and
// data files are rewritten after compaction
if (mergedSegments.size > 0) {
@@ -435,24 +443,32 @@ object SecondaryIndexUtil {
val loadFolderDetailsArrayMainTable =
SegmentStatusManager.readLoadMetadata(parentCarbonTable.getMetadataPath)
indexTables.asScala.foreach { indexTable =>
- val tableStatusFilePath =
CarbonTablePath.getTableStatusFilePath(indexTable.getTablePath)
- if (CarbonUtil.isFileExists(tableStatusFilePath)) {
- val loadFolderDetailsArray =
SegmentStatusManager.readLoadMetadata(indexTable
- .getMetadataPath);
- if (null != loadFolderDetailsArray && loadFolderDetailsArray.nonEmpty)
{
- try {
- SegmentStatusManager.writeLoadDetailsIntoFile(
- CarbonTablePath.getTableStatusFilePath(indexTable.getTablePath),
- updateTimeStampForIndexTable(loadFolderDetailsArrayMainTable,
- loadFolderDetailsArray))
- } catch {
- case ex: Exception =>
- LOGGER.error(ex.getMessage);
+ val statusLock =
+ new
SegmentStatusManager(indexTable.getAbsoluteTableIdentifier).getTableStatusLock
+ try {
+ if (statusLock.lockWithRetries()) {
+ val tableStatusFilePath =
CarbonTablePath.getTableStatusFilePath(indexTable.getTablePath)
+ if (CarbonUtil.isFileExists(tableStatusFilePath)) {
+ val loadFolderDetailsArray =
SegmentStatusManager.readLoadMetadata(indexTable
+ .getMetadataPath);
+ if (null != loadFolderDetailsArray &&
loadFolderDetailsArray.nonEmpty) {
+ SegmentStatusManager.writeLoadDetailsIntoFile(
+
CarbonTablePath.getTableStatusFilePath(indexTable.getTablePath),
+ updateTimeStampForIndexTable(loadFolderDetailsArrayMainTable,
+ loadFolderDetailsArray))
+ }
+ } else {
+ LOGGER.info(
+ "Table status file does not exist for index table: " +
indexTable.getTableUniqueName)
}
}
- } else {
- LOGGER.info(
- "Table status file does not exist for index table: " +
indexTable.getTableUniqueName)
+ } catch {
+ case ex: Exception =>
+ LOGGER.error(ex.getMessage);
+ } finally {
+ if (statusLock != null) {
+ statusLock.unlock()
+ }
}
}
}