This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 19f9027 [CARBONDATA-4100] Fix SI segments are in inconsistent state
with maintable after concurrent Load & Compaction operation
19f9027 is described below
commit 19f902785351f31be44fa89268d8a4b1854dc347
Author: Indhumathi27 <[email protected]>
AuthorDate: Mon Dec 28 21:38:10 2020 +0530
[CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable
after concurrent Load & Compaction operation
Why is this PR needed?
When Concurrent LOAD and COMPACTION is in progress on main table having SI,
SILoadEventListenerForFailedSegments listener is called to repair SI failed
segments if any. It will compare SI and main table segment status, if there is
a mismatch, then it will add that specific load to failedLoads to be re-loaded
again.
During Compaction, SI will be updated first and then maintable. So, in some
cases, SI segment will be in compacted state and main table will be in SUCCESS
state(the compaction can be still in progress or due to some operation
failure). SI index repair will add those segments to failedLoads, by checking
if segment lock can be acquired. But, if maintable compaction is finished by
the time, SI repair comparison is done, then also, it can acquire segment lock
and add those load to failedL [...]
What changes were proposed in this PR?
Acquire compaction lock on maintable(to ensure compaction is not running),
and then compare SI and main table load details, to repair SI segments.
Does this PR introduce any user interface change?
No
Is any new testcase added?
No (concurrent scenario)
This closes #4067
---
.../command/index/IndexRepairCommand.scala | 42 +--
.../apache/spark/sql/index/CarbonIndexUtil.scala | 325 ++++++++++++---------
.../SILoadEventListenerForFailedSegments.scala | 7 +-
3 files changed, 190 insertions(+), 184 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
index 43efba8..6bcea43 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
@@ -26,11 +26,7 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.exception.ConcurrentOperationException
-import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.index.IndexType
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatusManager}
-import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema,
CarbonLoadModel}
/**
@@ -73,26 +69,11 @@ extends DataCommand {
.lookupRelation(Some(databaseName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation].carbonTable
- val tableStatusLock = CarbonLockFactory
- .getCarbonLockObj(mainCarbonTable.getAbsoluteTableIdentifier,
LockUsage.TABLE_STATUS_LOCK)
- val carbonLoadModel = new CarbonLoadModel
- carbonLoadModel.setDatabaseName(databaseName)
- carbonLoadModel.setTableName(tableName)
- carbonLoadModel.setTablePath(mainCarbonTable.getTablePath)
- try {
- if (tableStatusLock.lockWithRetries()) {
- val tableStatusFilePath = CarbonTablePath
- .getTableStatusFilePath(mainCarbonTable.getTablePath)
- carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
- .readTableStatusFile(tableStatusFilePath).toList.asJava)
- carbonLoadModel.setCarbonDataLoadSchema(new
CarbonDataLoadSchema(mainCarbonTable))
- } else {
- throw new ConcurrentOperationException(mainCarbonTable.getDatabaseName,
- mainCarbonTable.getTableName, "table status read", "reindex command")
- }
- } finally {
- tableStatusLock.unlock()
- }
+ val carbonLoadModel = new CarbonLoadModel
+ carbonLoadModel.setDatabaseName(databaseName)
+ carbonLoadModel.setTableName(tableName)
+ carbonLoadModel.setTablePath(mainCarbonTable.getTablePath)
+ carbonLoadModel.setCarbonDataLoadSchema(new
CarbonDataLoadSchema(mainCarbonTable))
val indexMetadata = mainCarbonTable.getIndexMetadata
val secondaryIndexProvider = IndexType.SI.getIndexProviderName
if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
@@ -101,19 +82,11 @@ extends DataCommand {
.get(secondaryIndexProvider).keySet().asScala
// if there are no index tables for a given fact table do not perform
any action
if (indexTables.nonEmpty) {
- val mainTableDetails = if (segments.isEmpty) {
- carbonLoadModel.getLoadMetadataDetails.asScala.toList
- } else {
- // get segments for main table
- carbonLoadModel.getLoadMetadataDetails.asScala.toList.filter(
- loadMetaDataDetails =>
segments.get.contains(loadMetaDataDetails.getLoadName))
- }
if (indexTableToRepair.isEmpty) {
indexTables.foreach {
indexTableName =>
CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable,
carbonLoadModel,
- indexMetadata, mainTableDetails, secondaryIndexProvider,
- Integer.MAX_VALUE)(sparkSession)
+ indexMetadata, secondaryIndexProvider, Integer.MAX_VALUE,
segments)(sparkSession)
}
} else {
val indexTablesToRepair = indexTables.filter(indexTable => indexTable
@@ -121,8 +94,7 @@ extends DataCommand {
indexTablesToRepair.foreach {
indexTableName =>
CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable,
carbonLoadModel,
- indexMetadata, mainTableDetails, secondaryIndexProvider,
- Integer.MAX_VALUE)(sparkSession)
+ indexMetadata, secondaryIndexProvider, Integer.MAX_VALUE,
segments)(sparkSession)
}
if (indexTablesToRepair.isEmpty) {
throw new Exception("Unable to find index table" +
indexTableToRepair.get)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index 85d8ffb..3f229b0 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -37,6 +37,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil,
ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.index.IndexType
import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata
@@ -393,10 +394,10 @@ object CarbonIndexUtil {
}
def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
- carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
- mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider:
String,
- repairLimit: Int)
- (sparkSession: SparkSession) : Unit = {
+ carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
+ secondaryIndexProvider: String, repairLimit: Int,
+ segments: Option[List[String]] = Option.empty,
+ isLoadOrCompaction: Boolean = false)(sparkSession: SparkSession): Unit =
{
// when Si creation and load to main table are parallel, get the
carbonTable from the
// metastore which will have the latest index Info
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
@@ -406,158 +407,194 @@ object CarbonIndexUtil {
.asInstanceOf[CarbonRelation]
.carbonTable
- val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
- SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
- if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
- mainTableDetails.toArray,
- siTblLoadMetadataDetails)) {
- val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
- indexTableName)
- val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
- indexMetadata.getParentTableName,
- indexColumns.split(",").toList,
- indexTableName)
-
- // var details =
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
- // If it empty, then no need to do further computations because the
- // tabletstatus might not have been created and hence next load will
take care
- if (siTblLoadMetadataDetails.isEmpty) {
- Seq.empty
- }
+ val compactionLock = CarbonLockFactory.getCarbonLockObj(
+ carbonTable.getAbsoluteTableIdentifier,
+ LockUsage.COMPACTION_LOCK)
+ try {
+ // In some cases, SI table segment might be in COMPACTED state and main
table
+ // compaction might be still in progress. In those cases, we can try to
take compaction lock
+ // on main table and then compare and add SI segments to failedLoads, to
avoid repair
+ // SI SUCCESS loads.
+ if (compactionLock.lockWithRetries()) {
+ var mainTableDetails = try {
+
SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(
+ carbonTable.getTablePath))
+ } catch {
+ case exception: Exception =>
+ if (!isLoadOrCompaction) {
+ throw exception
+ }
+ return;
+ }
+ carbonLoadModel.setLoadMetadataDetails(mainTableDetails.toList.asJava)
+ if (segments.isDefined) {
+ mainTableDetails = mainTableDetails.filter(
+ loadMetaDataDetails =>
segments.get.contains(loadMetaDataDetails.getLoadName))
+ }
+ val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+ SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+ if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
+ mainTableDetails,
+ siTblLoadMetadataDetails)) {
+ val indexColumns =
indexMetadata.getIndexColumns(secondaryIndexProvider,
+ indexTableName)
+ val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
+ indexMetadata.getParentTableName,
+ indexColumns.split(",").toList,
+ indexTableName)
+
+ // If it empty, then no need to do further computations because the
+ // tabletstatus might not have been created and hence next load will
take care
+ if (siTblLoadMetadataDetails.isEmpty) {
+ Seq.empty
+ }
- val failedLoadMetadataDetails: java.util.List[LoadMetadataDetails] = new
util
- .ArrayList[LoadMetadataDetails]()
-
- // read the details of SI table and get all the failed segments during SI
- // creation which are MARKED_FOR_DELETE or invalid INSERT_IN_PROGRESS
- siTblLoadMetadataDetails.foreach {
- case loadMetaDetail: LoadMetadataDetails =>
- if (loadMetaDetail.getSegmentStatus ==
SegmentStatus.MARKED_FOR_DELETE &&
- checkIfMainTableLoadIsValid(mainTableDetails.toArray,
- loadMetaDetail.getLoadName) && repairLimit >
failedLoadMetadataDetails.size() ) {
- failedLoadMetadataDetails.add(loadMetaDetail)
- } else if ((loadMetaDetail.getSegmentStatus ==
- SegmentStatus.INSERT_IN_PROGRESS ||
- loadMetaDetail.getSegmentStatus ==
- SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) &&
- checkIfMainTableLoadIsValid(mainTableDetails.toArray,
- loadMetaDetail.getLoadName) && repairLimit >
failedLoadMetadataDetails.size()) {
- val segmentLock = CarbonLockFactory
- .getCarbonLockObj(indexTable.getAbsoluteTableIdentifier,
- CarbonTablePath.addSegmentPrefix(loadMetaDetail.getLoadName) +
- LockUsage.LOCK)
- try {
- if (segmentLock.lockWithRetries(1, 0)) {
- LOGGER
- .info("SIFailedLoadListener: Acquired segment lock on
segment:" +
- loadMetaDetail.getLoadName)
+ val failedLoadMetadataDetails: java.util.List[LoadMetadataDetails] =
new util
+ .ArrayList[LoadMetadataDetails]()
+
+ // read the details of SI table and get all the failed segments
during SI
+ // creation which are MARKED_FOR_DELETE or invalid INSERT_IN_PROGRESS
+ siTblLoadMetadataDetails.foreach {
+ case loadMetaDetail: LoadMetadataDetails =>
+ val isMainTableLoadValid =
checkIfMainTableLoadIsValid(mainTableDetails,
+ loadMetaDetail.getLoadName)
+ if (loadMetaDetail.getSegmentStatus ==
SegmentStatus.MARKED_FOR_DELETE &&
+ isMainTableLoadValid && repairLimit >
failedLoadMetadataDetails.size()) {
failedLoadMetadataDetails.add(loadMetaDetail)
+ } else if ((loadMetaDetail.getSegmentStatus ==
+ SegmentStatus.INSERT_IN_PROGRESS ||
+ loadMetaDetail.getSegmentStatus ==
+ SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) &&
+ isMainTableLoadValid && repairLimit >
failedLoadMetadataDetails.size()) {
+ val segmentLock = CarbonLockFactory
+ .getCarbonLockObj(indexTable.getAbsoluteTableIdentifier,
+
CarbonTablePath.addSegmentPrefix(loadMetaDetail.getLoadName) +
+ LockUsage.LOCK)
+ try {
+ if (segmentLock.lockWithRetries(1, 0)) {
+ LOGGER
+ .info("SIFailedLoadListener: Acquired segment lock on
segment:" +
+ loadMetaDetail.getLoadName)
+ failedLoadMetadataDetails.add(loadMetaDetail)
+ }
+ } finally {
+ segmentLock.unlock()
+ LOGGER
+ .info("SIFailedLoadListener: Released segment lock on
segment:" +
+ loadMetaDetail.getLoadName)
+ }
}
- } finally {
- segmentLock.unlock()
- LOGGER
- .info("SIFailedLoadListener: Released segment lock on
segment:" +
- loadMetaDetail.getLoadName)
- }
}
- }
- // check for the skipped segments. compare the main table and SI table
table
- // status file and get the skipped segments if any
-
CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
- .foreach(metadataDetail => {
- if (repairLimit > failedLoadMetadataDetails.size()) {
- val detail = siTblLoadMetadataDetails
- .filter(metadata => metadata.getLoadName.equals(metadataDetail))
- val mainTableDetail = mainTableDetails
- .filter(metadata => metadata.getLoadName.equals(metadataDetail))
- if (null == detail || detail.length == 0) {
- val newDetails = new LoadMetadataDetails
- newDetails.setLoadName(metadataDetail)
- LOGGER.error("Added in SILoadFailedSegment " +
newDetails.getLoadName + " for SI" +
- " table " + indexTableName + "." + carbonTable.getTableName)
- failedLoadMetadataDetails.add(newDetails)
- } else if (detail != null && detail.length != 0 && metadataDetail
!= null
- && metadataDetail.length != 0) {
- // If SI table has compacted segments and main table does not
have
- // compacted segments due to some failure while compaction, need
to
- // reload the original segments in this case.
- if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
- mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
- detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
- // in concurrent scenario, if a compaction is going on table,
then SI
- // segments are updated first in table status and then the
main table
- // segment, so in any load runs parallel this listener
shouldn't consider
- // those segments accidentally. So try to take the segment
lock.
- val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-
CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
- LockUsage.LOCK)
- if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
- segmentLocks += segmentLockOfProbableOnCompactionSeg
- LOGGER.error("Added in SILoadFailedSegment " +
detail(0).getLoadName + " for SI "
- + "table " + indexTableName + "." +
carbonTable.getTableName)
- failedLoadMetadataDetails.add(detail(0))
+
+ // check for the skipped segments. compare the main table and SI
table table
+ // status file and get the skipped segments if any
+
CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails).asScala
+ .foreach(metadataDetail => {
+ if (repairLimit > failedLoadMetadataDetails.size()) {
+ val detail = siTblLoadMetadataDetails
+ .filter(metadata =>
metadata.getLoadName.equals(metadataDetail))
+ val mainTableDetail = mainTableDetails
+ .filter(metadata =>
metadata.getLoadName.equals(metadataDetail))
+ if (null == detail || detail.length == 0) {
+ val newDetails = new LoadMetadataDetails
+ newDetails.setLoadName(metadataDetail)
+ LOGGER.error(
+ "Added in SILoadFailedSegment " + newDetails.getLoadName +
" for SI" +
+ " table " + indexTableName + "." +
carbonTable.getTableName)
+ failedLoadMetadataDetails.add(newDetails)
+ } else if (detail != null && detail.length != 0 &&
metadataDetail != null
+ && metadataDetail.length != 0) {
+ // If SI table has compacted segments and main table does
not have
+ // compacted segments due to some failure while compaction,
need to
+ // reload the original segments in this case.
+ if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
+ mainTableDetail(0).getSegmentStatus ==
SegmentStatus.SUCCESS) {
+ detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
+ // in concurrent scenario, if a compaction is going on
table, then SI
+ // segments are updated first in table status and then the
main table
+ // segment, so in any load runs parallel this listener
shouldn't consider
+ // those segments accidentally. So try to take the segment
lock.
+ val segmentLockOfProbableOnCompactionSeg =
CarbonLockFactory
+ .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+
CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
+ LockUsage.LOCK)
+ if
(segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
+ segmentLocks += segmentLockOfProbableOnCompactionSeg
+ LOGGER.error(
+ "Added in SILoadFailedSegment " +
detail(0).getLoadName + " for SI "
+ + "table " + indexTableName + "." +
carbonTable.getTableName)
+ failedLoadMetadataDetails.add(detail(0))
+ }
+ }
}
}
+ })
+
+ try {
+ if (!failedLoadMetadataDetails.isEmpty) {
+ // in the case when in SI table a segment is deleted and it's
entry is
+ // deleted from the tablestatus file, the corresponding .segment
file from
+ // the metadata folder should also be deleted as it contains the
+ // mergefilename which does not exist anymore as the segment is
deleted.
+ deleteStaleSegmentFileIfPresent(carbonLoadModel,
+ indexTable,
+ failedLoadMetadataDetails)
+ CarbonIndexUtil
+ .LoadToSITable(sparkSession,
+ carbonLoadModel,
+ indexTableName,
+ isLoadToFailedSISegments = true,
+ indexModel,
+ carbonTable, indexTable, false, failedLoadMetadataDetails)
}
- }
- })
- try {
- if (!failedLoadMetadataDetails.isEmpty) {
- // in the case when in SI table a segment is deleted and it's entry
is
- // deleted from the tablestatus file, the corresponding .segment
file from
- // the metadata folder should also be deleted as it contains the
- // mergefilename which does not exist anymore as the segment is
deleted.
- deleteStaleSegmentFileIfPresent(carbonLoadModel,
- indexTable,
- failedLoadMetadataDetails)
- CarbonIndexUtil
- .LoadToSITable(sparkSession,
- carbonLoadModel,
- indexTableName,
- isLoadToFailedSISegments = true,
- indexModel,
- carbonTable, indexTable, false, failedLoadMetadataDetails)
-
- // get the current load metadata details of the index table
- // details =
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
- }
-
- // get updated main table segments and si table segments
- val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
- SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
- val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
- SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
- // check if main table has load in progress and SI table has no load
- // in progress entry, then no need to enable the SI table
- // Only if the valid segments of maintable match the valid segments of
SI
- // table then we can enable the SI for query
- if (CarbonInternalLoaderUtil
- .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
- siTblLoadMetadataDetails)
- &&
CarbonInternalLoaderUtil.checkInProgLoadInMainTableAndSI(carbonTable,
- mainTblLoadMetadataDetails, siTblLoadMetadataDetails)) {
- // enable the SI table if it was disabled earlier due to failure
during SI
- // creation time
- sparkSession.sql(
- s"""ALTER TABLE ${carbonLoadModel.getDatabaseName}.$indexTableName
SET
- |SERDEPROPERTIES ('isSITableEnabled' =
'true')""".stripMargin).collect()
+ // get updated main table segments and si table segments
+ val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+ SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+
+ // check if main table has load in progress and SI table has no
load
+ // in progress entry, then no need to enable the SI table
+ // Only if the valid segments of maintable match the valid
segments of SI
+ // table then we can enable the SI for query
+ if (CarbonInternalLoaderUtil
+ .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
+ siTblLoadMetadataDetails)
+ &&
CarbonInternalLoaderUtil.checkInProgLoadInMainTableAndSI(carbonTable,
+ mainTblLoadMetadataDetails, siTblLoadMetadataDetails)) {
+ // enable the SI table if it was disabled earlier due to failure
during SI
+ // creation time
+ sparkSession.sql(
+ s"""ALTER TABLE ${ carbonLoadModel.getDatabaseName
}.$indexTableName SET
+ |SERDEPROPERTIES ('isSITableEnabled' =
'true')""".stripMargin).collect()
+ }
+ } catch {
+ case ex: Exception =>
+ // in case of SI load only for for failed segments, catch the
exception, but
+ // do not fail the main table load, as main table segments
should be available
+ // for query
+ LOGGER.error(s"Load to SI table to $indexTableName is failed " +
+ s"or SI table ENABLE is failed. ", ex)
+ Seq.empty
+ } finally {
+ segmentLocks.foreach {
+ segmentLock => segmentLock.unlock()
+ }
+ }
}
- } catch {
- case ex: Exception =>
- // in case of SI load only for for failed segments, catch the
exception, but
- // do not fail the main table load, as main table segments should be
available
- // for query
- LOGGER.error(s"Load to SI table to $indexTableName is failed " +
- s"or SI table ENABLE is failed. ", ex)
- Seq.empty
- } finally {
- segmentLocks.foreach {
- segmentLock => segmentLock.unlock()
+ } else {
+ LOGGER.error(s"Didn't check failed segments for index
[$indexTableName] as compaction " +
+ s"is progress on ${ carbonTable.getTableUniqueName }. " +
+ s"Please call SI repair again")
+ if (!isLoadOrCompaction) {
+ throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+ carbonTable.getTableName, "compaction", "reindex command")
}
}
+ } finally {
+ compactionLock.unlock()
}
Seq.empty
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index a1e6fc1..a7677ea 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.metadata.index.IndexType
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.{Event, OperationContext,
OperationEventListener}
import
org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostStatusUpdateEvent
@@ -70,13 +69,11 @@ class SILoadEventListenerForFailedSegments extends
OperationEventListener with L
carbonLoadModel.getTableName + " are : " + maxSegmentRepairLimit)
// if there are no index tables for a given fact table do not
perform any action
if (indexTables.nonEmpty) {
- val mainTableDetails =
-
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
indexTables.foreach {
indexTableName =>
CarbonIndexUtil.processSIRepair(indexTableName, carbonTable,
carbonLoadModel,
- indexMetadata, mainTableDetails.toList,
secondaryIndexProvider,
- maxSegmentRepairLimit)(sparkSession)
+ indexMetadata, secondaryIndexProvider,
+ maxSegmentRepairLimit, isLoadOrCompaction =
true)(sparkSession)
}
}
}