This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 02efddc [CARBONDATA-3790] Fix SI table consistency with main table
segments
02efddc is described below
commit 02efddcb0c1f514e74ccab7bc6204c93e551a0f0
Author: akashrn5 <[email protected]>
AuthorDate: Thu Apr 30 14:41:10 2020 +0530
[CARBONDATA-3790] Fix SI table consistency with main table segments
Why is this PR needed?
Consider a scenario when SI loading is happening after main tale load, then
when taking segment load, we got an issue and we skipped assuming that the
missed segments will be loaded in next load by
SILoadEventListenerForFailedSegments, but the status of SI is still enabled.
But SILoadEventListenerForFailedSegments will only load to skipped segments if
the status is disabled which will lead to segment mismatch between main and SI
table, which ay lead query failure or data mismatch.
What changes were proposed in this PR?
If it fails to take segment lock, during Si load, add to a skip list, if
that is not empty, make the SI disable, so that
SILoadEventListenerForFailedSegments will take care to load the missing ones in
next load to the main table.
Does this PR introduce any user interface change?
No
Is any new test case added?
No
This closes #3734
---
.../sql/secondaryindex/rdd/SecondaryIndexCreator.scala | 14 ++++++++++++++
1 file changed, 14 insertions(+)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index ec63065..04cfbb0 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -91,6 +91,7 @@ object SecondaryIndexCreator {
var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
val validSegments: java.util.List[String] = new util.ArrayList[String]()
+ val skippedSegments: java.util.List[String] = new util.ArrayList[String]()
var validSegmentList = List.empty[String]
try {
@@ -110,6 +111,7 @@ object SecondaryIndexCreator {
// skipped segments load will be handled in
SILoadEventListenerForFailedSegments
validSegments.add(eachSegment)
} else {
+ skippedSegments.add(eachSegment)
LOGGER.error(s"Not able to acquire the segment lock for table" +
s" ${indexCarbonTable.getTableUniqueName} for segment:
$eachSegment. " +
s"Skipping this segment from loading.")
@@ -321,6 +323,18 @@ object SecondaryIndexCreator {
segmentLocks.foreach(segmentLock => {
segmentLock.unlock()
})
+ // if some segments are skipped, disable the SI table so that
+ // SILoadEventListenerForFailedSegments will take care to load to these
segments in next
+ // consecutive load to main table.
+ if (!skippedSegments.isEmpty) {
+ secondaryIndexModel.sqlContext.sparkSession.sql(
+ s"""ALTER TABLE ${
+ secondaryIndexModel
+ .carbonLoadModel
+ .getDatabaseName
+ }.${ secondaryIndexModel.secondaryIndex.indexName } SET
+ |SERDEPROPERTIES ('isSITableEnabled' =
'false')""".stripMargin).collect()
+ }
try {
if (!isCompactionCall) {
SegmentStatusManager