This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 19f9027  [CARBONDATA-4100] Fix SI segments are in inconsistent state 
with maintable after concurrent Load & Compaction operation
19f9027 is described below

commit 19f902785351f31be44fa89268d8a4b1854dc347
Author: Indhumathi27 <[email protected]>
AuthorDate: Mon Dec 28 21:38:10 2020 +0530

    [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable 
after concurrent Load & Compaction operation
    
    Why is this PR needed?
    When Concurrent LOAD and COMPACTION is in progress on main table having SI, 
SILoadEventListenerForFailedSegments listener is called to repair SI failed 
segments if any. It will compare SI and main table segment status, if there is 
a mismatch, then it will add that specific load to failedLoads to be re-loaded 
again.
    
    During Compaction, SI will be updated first and then maintable. So, in some 
cases, SI segment will be in compacted state and main table will be in SUCCESS 
state(the compaction can be still in progress or due to some operation 
failure). SI index repair will add those segments to failedLoads, by checking 
if segment lock can be acquired. But, if maintable compaction is finished by 
the time, SI repair comparison is done, then also, it can acquire segment lock 
and add those load to failedL [...]
    
    What changes were proposed in this PR?
    Acquire compaction lock on maintable(to ensure compaction is not running), 
and then compare SI and main table load details, to repair SI segments.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No (concurrent scenario)
    
    This closes #4067
---
 .../command/index/IndexRepairCommand.scala         |  42 +--
 .../apache/spark/sql/index/CarbonIndexUtil.scala   | 325 ++++++++++++---------
 .../SILoadEventListenerForFailedSegments.scala     |   7 +-
 3 files changed, 190 insertions(+), 184 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
index 43efba8..6bcea43 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
@@ -26,11 +26,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.index.CarbonIndexUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.exception.ConcurrentOperationException
-import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.index.IndexType
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
-import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
 
 /**
@@ -73,26 +69,11 @@ extends DataCommand {
       .lookupRelation(Some(databaseName), tableName)(sparkSession)
       .asInstanceOf[CarbonRelation].carbonTable
 
-    val tableStatusLock = CarbonLockFactory
-      .getCarbonLockObj(mainCarbonTable.getAbsoluteTableIdentifier, 
LockUsage.TABLE_STATUS_LOCK)
-      val carbonLoadModel = new CarbonLoadModel
-      carbonLoadModel.setDatabaseName(databaseName)
-      carbonLoadModel.setTableName(tableName)
-      carbonLoadModel.setTablePath(mainCarbonTable.getTablePath)
-    try {
-      if (tableStatusLock.lockWithRetries()) {
-        val tableStatusFilePath = CarbonTablePath
-          .getTableStatusFilePath(mainCarbonTable.getTablePath)
-        carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
-          .readTableStatusFile(tableStatusFilePath).toList.asJava)
-        carbonLoadModel.setCarbonDataLoadSchema(new 
CarbonDataLoadSchema(mainCarbonTable))
-      } else {
-        throw new ConcurrentOperationException(mainCarbonTable.getDatabaseName,
-          mainCarbonTable.getTableName, "table status read", "reindex command")
-      }
-    } finally {
-      tableStatusLock.unlock()
-    }
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableName)
+    carbonLoadModel.setTablePath(mainCarbonTable.getTablePath)
+    carbonLoadModel.setCarbonDataLoadSchema(new 
CarbonDataLoadSchema(mainCarbonTable))
     val indexMetadata = mainCarbonTable.getIndexMetadata
     val secondaryIndexProvider = IndexType.SI.getIndexProviderName
     if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
@@ -101,19 +82,11 @@ extends DataCommand {
         .get(secondaryIndexProvider).keySet().asScala
       // if there are no index tables for a given fact table do not perform 
any action
       if (indexTables.nonEmpty) {
-        val mainTableDetails = if (segments.isEmpty) {
-          carbonLoadModel.getLoadMetadataDetails.asScala.toList
-        } else {
-          // get segments for main table
-          carbonLoadModel.getLoadMetadataDetails.asScala.toList.filter(
-            loadMetaDataDetails => 
segments.get.contains(loadMetaDataDetails.getLoadName))
-        }
         if (indexTableToRepair.isEmpty) {
           indexTables.foreach {
             indexTableName =>
               CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, 
carbonLoadModel,
-                indexMetadata, mainTableDetails, secondaryIndexProvider,
-                Integer.MAX_VALUE)(sparkSession)
+                indexMetadata, secondaryIndexProvider, Integer.MAX_VALUE, 
segments)(sparkSession)
           }
         } else {
           val indexTablesToRepair = indexTables.filter(indexTable => indexTable
@@ -121,8 +94,7 @@ extends DataCommand {
           indexTablesToRepair.foreach {
             indexTableName =>
               CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, 
carbonLoadModel,
-                indexMetadata, mainTableDetails, secondaryIndexProvider,
-                Integer.MAX_VALUE)(sparkSession)
+                indexMetadata, secondaryIndexProvider, Integer.MAX_VALUE, 
segments)(sparkSession)
           }
           if (indexTablesToRepair.isEmpty) {
             throw new Exception("Unable to find index table" + 
indexTableToRepair.get)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index 85d8ffb..3f229b0 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -37,6 +37,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata
@@ -393,10 +394,10 @@ object CarbonIndexUtil {
   }
 
   def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
-    carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
-      mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: 
String,
-                      repairLimit: Int)
-  (sparkSession: SparkSession) : Unit = {
+      carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
+      secondaryIndexProvider: String, repairLimit: Int,
+      segments: Option[List[String]] = Option.empty,
+      isLoadOrCompaction: Boolean = false)(sparkSession: SparkSession): Unit = 
{
     // when Si creation and load to main table are parallel, get the 
carbonTable from the
     // metastore which will have the latest index Info
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
@@ -406,158 +407,194 @@ object CarbonIndexUtil {
       .asInstanceOf[CarbonRelation]
       .carbonTable
 
-    val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-      SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
     var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
-    if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
-      mainTableDetails.toArray,
-      siTblLoadMetadataDetails)) {
-      val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
-        indexTableName)
-      val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
-        indexMetadata.getParentTableName,
-        indexColumns.split(",").toList,
-        indexTableName)
-
-      // var details = 
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-      // If it empty, then no need to do further computations because the
-      // tabletstatus might not have been created and hence next load will 
take care
-      if (siTblLoadMetadataDetails.isEmpty) {
-        Seq.empty
-      }
+    val compactionLock = CarbonLockFactory.getCarbonLockObj(
+      carbonTable.getAbsoluteTableIdentifier,
+      LockUsage.COMPACTION_LOCK)
+    try {
+      // In some cases, SI table segment might be in COMPACTED state and main 
table
+      // compaction might be still in progress. In those cases, we can try to 
take compaction lock
+      // on main table and then compare and add SI segments to failedLoads, to 
avoid repair
+      // SI SUCCESS loads.
+      if (compactionLock.lockWithRetries()) {
+        var mainTableDetails = try {
+          
SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(
+            carbonTable.getTablePath))
+        } catch {
+          case exception: Exception =>
+            if (!isLoadOrCompaction) {
+              throw exception
+            }
+            return;
+        }
+        carbonLoadModel.setLoadMetadataDetails(mainTableDetails.toList.asJava)
+        if (segments.isDefined) {
+          mainTableDetails = mainTableDetails.filter(
+            loadMetaDataDetails => 
segments.get.contains(loadMetaDataDetails.getLoadName))
+        }
+        val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+          SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+        if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
+          mainTableDetails,
+          siTblLoadMetadataDetails)) {
+          val indexColumns = 
indexMetadata.getIndexColumns(secondaryIndexProvider,
+            indexTableName)
+          val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
+            indexMetadata.getParentTableName,
+            indexColumns.split(",").toList,
+            indexTableName)
+
+          // If it empty, then no need to do further computations because the
+          // tabletstatus might not have been created and hence next load will 
take care
+          if (siTblLoadMetadataDetails.isEmpty) {
+            Seq.empty
+          }
 
-      val failedLoadMetadataDetails: java.util.List[LoadMetadataDetails] = new 
util
-      .ArrayList[LoadMetadataDetails]()
-
-      // read the details of SI table and get all the failed segments during SI
-      // creation which are MARKED_FOR_DELETE or invalid INSERT_IN_PROGRESS
-      siTblLoadMetadataDetails.foreach {
-        case loadMetaDetail: LoadMetadataDetails =>
-          if (loadMetaDetail.getSegmentStatus == 
SegmentStatus.MARKED_FOR_DELETE &&
-            checkIfMainTableLoadIsValid(mainTableDetails.toArray,
-              loadMetaDetail.getLoadName) && repairLimit > 
failedLoadMetadataDetails.size() ) {
-            failedLoadMetadataDetails.add(loadMetaDetail)
-          } else if ((loadMetaDetail.getSegmentStatus ==
-            SegmentStatus.INSERT_IN_PROGRESS ||
-            loadMetaDetail.getSegmentStatus ==
-              SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) &&
-            checkIfMainTableLoadIsValid(mainTableDetails.toArray,
-              loadMetaDetail.getLoadName) && repairLimit > 
failedLoadMetadataDetails.size()) {
-            val segmentLock = CarbonLockFactory
-              .getCarbonLockObj(indexTable.getAbsoluteTableIdentifier,
-                CarbonTablePath.addSegmentPrefix(loadMetaDetail.getLoadName) +
-                  LockUsage.LOCK)
-            try {
-              if (segmentLock.lockWithRetries(1, 0)) {
-                LOGGER
-                  .info("SIFailedLoadListener: Acquired segment lock on 
segment:" +
-                    loadMetaDetail.getLoadName)
+          val failedLoadMetadataDetails: java.util.List[LoadMetadataDetails] = 
new util
+          .ArrayList[LoadMetadataDetails]()
+
+          // read the details of SI table and get all the failed segments 
during SI
+          // creation which are MARKED_FOR_DELETE or invalid INSERT_IN_PROGRESS
+          siTblLoadMetadataDetails.foreach {
+            case loadMetaDetail: LoadMetadataDetails =>
+              val isMainTableLoadValid = 
checkIfMainTableLoadIsValid(mainTableDetails,
+                loadMetaDetail.getLoadName)
+              if (loadMetaDetail.getSegmentStatus == 
SegmentStatus.MARKED_FOR_DELETE &&
+                  isMainTableLoadValid && repairLimit > 
failedLoadMetadataDetails.size()) {
                 failedLoadMetadataDetails.add(loadMetaDetail)
+              } else if ((loadMetaDetail.getSegmentStatus ==
+                          SegmentStatus.INSERT_IN_PROGRESS ||
+                          loadMetaDetail.getSegmentStatus ==
+                          SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) &&
+                         isMainTableLoadValid && repairLimit > 
failedLoadMetadataDetails.size()) {
+                val segmentLock = CarbonLockFactory
+                  .getCarbonLockObj(indexTable.getAbsoluteTableIdentifier,
+                    
CarbonTablePath.addSegmentPrefix(loadMetaDetail.getLoadName) +
+                    LockUsage.LOCK)
+                try {
+                  if (segmentLock.lockWithRetries(1, 0)) {
+                    LOGGER
+                      .info("SIFailedLoadListener: Acquired segment lock on 
segment:" +
+                            loadMetaDetail.getLoadName)
+                    failedLoadMetadataDetails.add(loadMetaDetail)
+                  }
+                } finally {
+                  segmentLock.unlock()
+                  LOGGER
+                    .info("SIFailedLoadListener: Released segment lock on 
segment:" +
+                          loadMetaDetail.getLoadName)
+                }
               }
-            } finally {
-              segmentLock.unlock()
-              LOGGER
-                .info("SIFailedLoadListener: Released segment lock on 
segment:" +
-                  loadMetaDetail.getLoadName)
-            }
           }
-      }
-      // check for the skipped segments. compare the main table and SI table 
table
-      // status file and get the skipped segments if any
-      
CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + 
newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail 
!= null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not 
have
-              // compacted segments due to some failure while compaction, need 
to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, 
then SI
-                // segments are updated first in table status and then the 
main table
-                // segment, so in any load runs parallel this listener 
shouldn't consider
-                // those segments accidentally. So try to take the segment 
lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    
CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + 
detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + 
carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+
+          // check for the skipped segments. compare the main table and SI 
table table
+          // status file and get the skipped segments if any
+          
CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails).asScala
+            .foreach(metadataDetail => {
+              if (repairLimit > failedLoadMetadataDetails.size()) {
+                val detail = siTblLoadMetadataDetails
+                  .filter(metadata => 
metadata.getLoadName.equals(metadataDetail))
+                val mainTableDetail = mainTableDetails
+                  .filter(metadata => 
metadata.getLoadName.equals(metadataDetail))
+                if (null == detail || detail.length == 0) {
+                  val newDetails = new LoadMetadataDetails
+                  newDetails.setLoadName(metadataDetail)
+                  LOGGER.error(
+                    "Added in SILoadFailedSegment " + newDetails.getLoadName + 
" for SI" +
+                    " table " + indexTableName + "." + 
carbonTable.getTableName)
+                  failedLoadMetadataDetails.add(newDetails)
+                } else if (detail != null && detail.length != 0 && 
metadataDetail != null
+                           && metadataDetail.length != 0) {
+                  // If SI table has compacted segments and main table does 
not have
+                  // compacted segments due to some failure while compaction, 
need to
+                  // reload the original segments in this case.
+                  if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
+                      mainTableDetail(0).getSegmentStatus == 
SegmentStatus.SUCCESS) {
+                    detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
+                    // in concurrent scenario, if a compaction is going on 
table, then SI
+                    // segments are updated first in table status and then the 
main table
+                    // segment, so in any load runs parallel this listener 
shouldn't consider
+                    // those segments accidentally. So try to take the segment 
lock.
+                    val segmentLockOfProbableOnCompactionSeg = 
CarbonLockFactory
+                      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+                        
CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
+                        LockUsage.LOCK)
+                    if 
(segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
+                      segmentLocks += segmentLockOfProbableOnCompactionSeg
+                      LOGGER.error(
+                        "Added in SILoadFailedSegment " + 
detail(0).getLoadName + " for SI "
+                        + "table " + indexTableName + "." + 
carbonTable.getTableName)
+                      failedLoadMetadataDetails.add(detail(0))
+                    }
+                  }
                 }
               }
+            })
+
+          try {
+            if (!failedLoadMetadataDetails.isEmpty) {
+              // in the case when in SI table a segment is deleted and it's 
entry is
+              // deleted from the tablestatus file, the corresponding .segment 
file from
+              // the metadata folder should also be deleted as it contains the
+              // mergefilename which does not exist anymore as the segment is 
deleted.
+              deleteStaleSegmentFileIfPresent(carbonLoadModel,
+                indexTable,
+                failedLoadMetadataDetails)
+              CarbonIndexUtil
+                .LoadToSITable(sparkSession,
+                  carbonLoadModel,
+                  indexTableName,
+                  isLoadToFailedSISegments = true,
+                  indexModel,
+                  carbonTable, indexTable, false, failedLoadMetadataDetails)
             }
-          }
-        })
-      try {
-        if (!failedLoadMetadataDetails.isEmpty) {
-          // in the case when in SI table a segment is deleted and it's entry 
is
-          // deleted from the tablestatus file, the corresponding .segment 
file from
-          // the metadata folder should also be deleted as it contains the
-          // mergefilename which does not exist anymore as the segment is 
deleted.
-          deleteStaleSegmentFileIfPresent(carbonLoadModel,
-            indexTable,
-            failedLoadMetadataDetails)
-          CarbonIndexUtil
-            .LoadToSITable(sparkSession,
-              carbonLoadModel,
-              indexTableName,
-              isLoadToFailedSISegments = true,
-              indexModel,
-              carbonTable, indexTable, false, failedLoadMetadataDetails)
-
-          // get the current load metadata details of the index table
-          // details = 
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-        }
-
-        // get updated main table segments and si table segments
-        val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-          SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
-        val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-          SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
 
-        // check if main table has load in progress and SI table has no load
-        // in progress entry, then no need to enable the SI table
-        // Only if the valid segments of maintable match the valid segments of 
SI
-        // table then we can enable the SI for query
-        if (CarbonInternalLoaderUtil
-          .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
-            siTblLoadMetadataDetails)
-          && 
CarbonInternalLoaderUtil.checkInProgLoadInMainTableAndSI(carbonTable,
-          mainTblLoadMetadataDetails, siTblLoadMetadataDetails)) {
-          // enable the SI table if it was disabled earlier due to failure 
during SI
-          // creation time
-          sparkSession.sql(
-            s"""ALTER TABLE ${carbonLoadModel.getDatabaseName}.$indexTableName 
SET
-               |SERDEPROPERTIES ('isSITableEnabled' = 
'true')""".stripMargin).collect()
+            // get updated main table segments and si table segments
+            val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+              
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+            val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+              SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+
+            // check if main table has load in progress and SI table has no 
load
+            // in progress entry, then no need to enable the SI table
+            // Only if the valid segments of maintable match the valid 
segments of SI
+            // table then we can enable the SI for query
+            if (CarbonInternalLoaderUtil
+                  .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
+                    siTblLoadMetadataDetails)
+                && 
CarbonInternalLoaderUtil.checkInProgLoadInMainTableAndSI(carbonTable,
+              mainTblLoadMetadataDetails, siTblLoadMetadataDetails)) {
+              // enable the SI table if it was disabled earlier due to failure 
during SI
+              // creation time
+              sparkSession.sql(
+                s"""ALTER TABLE ${ carbonLoadModel.getDatabaseName 
}.$indexTableName SET
+                   |SERDEPROPERTIES ('isSITableEnabled' = 
'true')""".stripMargin).collect()
+            }
+          } catch {
+            case ex: Exception =>
+              // in case of SI load only for for failed segments, catch the 
exception, but
+              // do not fail the main table load, as main table segments 
should be available
+              // for query
+              LOGGER.error(s"Load to SI table to $indexTableName is failed " +
+                           s"or SI table ENABLE is failed. ", ex)
+              Seq.empty
+          } finally {
+            segmentLocks.foreach {
+              segmentLock => segmentLock.unlock()
+            }
+          }
         }
-      } catch {
-        case ex: Exception =>
-          // in case of SI load only for for failed segments, catch the 
exception, but
-          // do not fail the main table load, as main table segments should be 
available
-          // for query
-          LOGGER.error(s"Load to SI table to $indexTableName is failed " +
-            s"or SI table ENABLE is failed. ", ex)
-          Seq.empty
-      } finally {
-        segmentLocks.foreach {
-          segmentLock => segmentLock.unlock()
+      } else {
+        LOGGER.error(s"Didn't check failed segments for index 
[$indexTableName] as compaction " +
+                     s"is progress on ${ carbonTable.getTableUniqueName }. " +
+                     s"Please call SI repair again")
+        if (!isLoadOrCompaction) {
+          throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+            carbonTable.getTableName, "compaction", "reindex command")
         }
       }
+    } finally {
+      compactionLock.unlock()
     }
     Seq.empty
   }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index a1e6fc1..a7677ea 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.index.CarbonIndexUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.index.IndexType
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{Event, OperationContext, 
OperationEventListener}
 import 
org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostStatusUpdateEvent
@@ -70,13 +69,11 @@ class SILoadEventListenerForFailedSegments extends 
OperationEventListener with L
               carbonLoadModel.getTableName + " are : " + maxSegmentRepairLimit)
             // if there are no index tables for a given fact table do not 
perform any action
             if (indexTables.nonEmpty) {
-              val mainTableDetails =
-                
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
               indexTables.foreach {
                 indexTableName =>
                   CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, 
carbonLoadModel,
-                    indexMetadata, mainTableDetails.toList, 
secondaryIndexProvider,
-                    maxSegmentRepairLimit)(sparkSession)
+                    indexMetadata, secondaryIndexProvider,
+                    maxSegmentRepairLimit, isLoadOrCompaction = 
true)(sparkSession)
               }
             }
           }

Reply via email to