This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 02efddc  [CARBONDATA-3790] Fix SI table consistency with main table 
segments
02efddc is described below

commit 02efddcb0c1f514e74ccab7bc6204c93e551a0f0
Author: akashrn5 <akashnilu...@gmail.com>
AuthorDate: Thu Apr 30 14:41:10 2020 +0530

    [CARBONDATA-3790] Fix SI table consistency with main table segments
    
    Why is this PR needed?
    Consider a scenario when SI loading is happening after main tale load, then 
when taking segment load, we got an issue and we skipped assuming that the 
missed segments will be loaded in next load by 
SILoadEventListenerForFailedSegments, but the status of SI is still enabled. 
But SILoadEventListenerForFailedSegments will only load to skipped segments if 
the status is disabled which will lead to segment mismatch between main and SI 
table, which ay lead query failure or data mismatch.
    
    What changes were proposed in this PR?
    If it fails to take segment lock, during Si load, add to a skip list, if 
that is not empty, make the SI disable, so that 
SILoadEventListenerForFailedSegments will take care to load the missing ones in 
next load to the main table.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new test case added?
    No
    
    This closes #3734
---
 .../sql/secondaryindex/rdd/SecondaryIndexCreator.scala     | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index ec63065..04cfbb0 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -91,6 +91,7 @@ object SecondaryIndexCreator {
 
     var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
     val validSegments: java.util.List[String] = new util.ArrayList[String]()
+    val skippedSegments: java.util.List[String] = new util.ArrayList[String]()
     var validSegmentList = List.empty[String]
 
     try {
@@ -110,6 +111,7 @@ object SecondaryIndexCreator {
           // skipped segments load will be handled in 
SILoadEventListenerForFailedSegments
           validSegments.add(eachSegment)
         } else {
+          skippedSegments.add(eachSegment)
           LOGGER.error(s"Not able to acquire the segment lock for table" +
                        s" ${indexCarbonTable.getTableUniqueName} for segment: 
$eachSegment. " +
                        s"Skipping this segment from loading.")
@@ -321,6 +323,18 @@ object SecondaryIndexCreator {
       segmentLocks.foreach(segmentLock => {
         segmentLock.unlock()
       })
+      // if some segments are skipped, disable the SI table so that
+      // SILoadEventListenerForFailedSegments will take care to load to these 
segments in next
+      // consecutive load to main table.
+      if (!skippedSegments.isEmpty) {
+        secondaryIndexModel.sqlContext.sparkSession.sql(
+          s"""ALTER TABLE ${
+            secondaryIndexModel
+              .carbonLoadModel
+              .getDatabaseName
+          }.${ secondaryIndexModel.secondaryIndex.indexName } SET
+             |SERDEPROPERTIES ('isSITableEnabled' = 
'false')""".stripMargin).collect()
+      }
       try {
         if (!isCompactionCall) {
           SegmentStatusManager

Reply via email to