Indhumathi27 commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549670126
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
}
}
}
- // check for the skipped segments. compare the main table and SI table
table
- // status file and get the skipped segments if any
-
CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
- .foreach(metadataDetail => {
- if (repairLimit > failedLoadMetadataDetails.size()) {
- val detail = siTblLoadMetadataDetails
- .filter(metadata => metadata.getLoadName.equals(metadataDetail))
- val mainTableDetail = mainTableDetails
- .filter(metadata => metadata.getLoadName.equals(metadataDetail))
- if (null == detail || detail.length == 0) {
- val newDetails = new LoadMetadataDetails
- newDetails.setLoadName(metadataDetail)
- LOGGER.error("Added in SILoadFailedSegment " +
newDetails.getLoadName + " for SI" +
- " table " + indexTableName + "." + carbonTable.getTableName)
- failedLoadMetadataDetails.add(newDetails)
- } else if (detail != null && detail.length != 0 && metadataDetail
!= null
- && metadataDetail.length != 0) {
- // If SI table has compacted segments and main table does not
have
- // compacted segments due to some failure while compaction, need
to
- // reload the original segments in this case.
- if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
- mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
- detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
- // in concurrent scenario, if a compaction is going on table,
then SI
- // segments are updated first in table status and then the
main table
- // segment, so in any load runs parallel this listener
shouldn't consider
- // those segments accidentally. So try to take the segment
lock.
- val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-
CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
- LockUsage.LOCK)
- if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
- segmentLocks += segmentLockOfProbableOnCompactionSeg
- LOGGER.error("Added in SILoadFailedSegment " +
detail(0).getLoadName + " for SI "
- + "table " + indexTableName + "." +
carbonTable.getTableName)
- failedLoadMetadataDetails.add(detail(0))
+ val carbonLock = CarbonLockFactory.getCarbonLockObj(
+ carbonTable.getAbsoluteTableIdentifier,
+ LockUsage.COMPACTION_LOCK)
+ try {
+ // In some cases, SI table segment might be in COMPACTED state and
main table
+ // compaction might be still in progress. In those cases, we can try
to take compaction lock
+ // on main table and then compare and add SI segments to failedLoads,
to avoid repair
+ // SI SUCCESS loads.
+ if (carbonLock.lockWithRetries(3, 0)) {
+ val newMainTableDetails =
SegmentStatusManager.readLoadMetadata(carbonTable
Review comment:
modified
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]