Indhumathi27 commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549670016



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table 
table
-      // status file and get the skipped segments if any
-      
CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + 
newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail 
!= null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not 
have
-              // compacted segments due to some failure while compaction, need 
to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, 
then SI
-                // segments are updated first in table status and then the 
main table
-                // segment, so in any load runs parallel this listener 
shouldn't consider
-                // those segments accidentally. So try to take the segment 
lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    
CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + 
detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + 
carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and 
main table
+        // compaction might be still in progress. In those cases, we can try 
to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, 
to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {
+          val newMainTableDetails = 
SegmentStatusManager.readLoadMetadata(carbonTable
+            .getMetadataPath)
+          // check for the skipped segments. compare the main table and SI 
table table
+          // status file and get the skipped segments if any
+          
CarbonInternalLoaderUtil.getListOfValidSlices(newMainTableDetails).asScala
+            .foreach(metadataDetail => {
+              if (repairLimit > failedLoadMetadataDetails.size()) {
+                val detail = siTblLoadMetadataDetails
+                  .filter(metadata => 
metadata.getLoadName.equals(metadataDetail))
+                val mainTableDetail = newMainTableDetails
+                  .filter(metadata => 
metadata.getLoadName.equals(metadataDetail))
+                if (null == detail || detail.length == 0) {
+                  val newDetails = new LoadMetadataDetails
+                  newDetails.setLoadName(metadataDetail)
+                  LOGGER.error(
+                    "Added in SILoadFailedSegment " + newDetails.getLoadName + 
" for SI" +
+                    " table " + indexTableName + "." + 
carbonTable.getTableName)
+                  failedLoadMetadataDetails.add(newDetails)
+                } else if (detail != null && detail.length != 0 && 
metadataDetail != null
+                           && metadataDetail.length != 0) {
+                  // If SI table has compacted segments and main table does 
not have
+                  // compacted segments due to some failure while compaction, 
need to
+                  // reload the original segments in this case.
+                  if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
+                      mainTableDetail(0).getSegmentStatus == 
SegmentStatus.SUCCESS) {
+                    detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
+                    // in concurrent scenario, if a compaction is going on 
table, then SI
+                    // segments are updated first in table status and then the 
main table
+                    // segment, so in any load runs parallel this listener 
shouldn't consider
+                    // those segments accidentally. So try to take the segment 
lock.
+                    val segmentLockOfProbableOnCompactionSeg = 
CarbonLockFactory
+                      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+                        
CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
+                        LockUsage.LOCK)
+                    if 
(segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
+                      segmentLocks += segmentLockOfProbableOnCompactionSeg
+                      LOGGER.error(
+                        "Added in SILoadFailedSegment " + 
detail(0).getLoadName + " for SI "
+                        + "table " + indexTableName + "." + 
carbonTable.getTableName)
+                      failedLoadMetadataDetails.add(detail(0))
+                    }
+                  }
                 }
               }
-            }
-          }
-        })
+            })
+        } else {
+          LOGGER.error(
+            "Unable to obtain compaction lock for table" + 
carbonTable.getTableUniqueName)

Review comment:
       done

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table 
table
-      // status file and get the skipped segments if any
-      
CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + 
newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail 
!= null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not 
have
-              // compacted segments due to some failure while compaction, need 
to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, 
then SI
-                // segments are updated first in table status and then the 
main table
-                // segment, so in any load runs parallel this listener 
shouldn't consider
-                // those segments accidentally. So try to take the segment 
lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    
CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + 
detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + 
carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and 
main table
+        // compaction might be still in progress. In those cases, we can try 
to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, 
to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to