This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d4b0709  [CARBONDATA-3885] [CARBONDATA-3884] Fix for load failures 
with isSIenabled = false and fix for concurrent load failure
d4b0709 is described below

commit d4b0709cf8c4f20ad61114d69022f15663b84915
Author: Vikram Ahuja <[email protected]>
AuthorDate: Wed Jun 24 12:16:42 2020 +0530

    [CARBONDATA-3885] [CARBONDATA-3884] Fix for load failures with isSIenabled 
= false and fix for concurrent load failure
    
    Why is this PR needed?
    Issue 1:
    In some concurrent scenarios it was found that certain segment folders were 
getting deleted for the SI table. When
    the segment was being inserted again back to the SI table during a new load 
to the main table by doing isSIenabled = false,
    the load to the SI table failed.
    
    Root Cause: It was found that when the segment folder gets deleted and it's 
entry from table status file is manually
    deleted then, during new load to SI when table status is changed to 
InsertInProgress during the loading previously
    deleted segments the stale .segmentfile still exists which was pointing to 
the mergeindexfile which does exist
    anymore(since the segment folder is deleted). Since that mergeindexfile 
does not exist anymore the load to SI table failed
    
    Issue 2:
    In some concurrent load scenarios in main table with SI table with 
isSITableEnabled = false , load to SI table fails.
    
    Root Cause: When concurrent loading is done with isSITableEnabled = false, 
maintable details are taken earlier in the
    code and SItable details are taken later. It was found that there were some 
scenarios where another concurrent load to
    main table finishes and can change the number of segments in the main table 
due to which there is mismatch in number
    of segments in the SI table and in the main table. Due to this mismatch 
when the validmaintableload method is called,
    there can be scenarios where the load fails.
    
    What changes were proposed in this PR?
    Solution for Issue 1: Whenever isSIenabled is set to false and it tries to 
load previously deleted SI segments to the
    SI table we go and delete the stale .segment file from the metadata folder 
if it exists. Due to this, while loading a
    new .segment file will be created thus having a new index file path.
    
    Solution for Issue 2: Whenever isSIenabled is set to false and it tries to 
load previously deleted SI segments,
    just before adding segments in SIFailedSegmentList checking again if the 
loadname is present in the main table. Only
    adding in SIFailedSegmenList if the loadname is present in the main table 
and it's in SUCCESS state.
    
    This closes #3802
---
 .../SILoadEventListenerForFailedSegments.scala     | 57 +++++++++++++++++++---
 .../load/CarbonInternalLoaderUtil.java             | 33 ++++++++++++-
 2 files changed, 82 insertions(+), 8 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index 74f2f53..cae33ed 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -32,13 +32,15 @@ import 
org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.index.IndexType
-import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{Event, OperationContext, 
OperationEventListener}
 import 
org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostStatusUpdateEvent
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 
 /**
  * This Listener is to load the data to failed segments of Secondary index 
table(s)
@@ -149,15 +151,35 @@ class SILoadEventListenerForFailedSegments extends 
OperationEventListener with L
                     .foreach(metadataDetail => {
                       val detail = details
                         .filter(metadata => 
metadata.getLoadName.equals(metadataDetail))
+                      val mainTableDetail = mainTableDetails
+                        .filter(metadata => 
metadata.getLoadName.equals(metadataDetail))
                       if (null == detail || detail.length == 0) {
                         val newDetails = new LoadMetadataDetails
                         newDetails.setLoadName(metadataDetail)
                         LOGGER.error("Added in SILoadFailedSegment " + 
newDetails.getLoadName)
                         failedLoadMetadataDetails.add(newDetails)
+                      } else if (detail != null && detail.length !=0 && 
metadataDetail != null
+                                 && metadataDetail.length != 0) {
+                        // If SI table has compacted segments and main table 
does not have
+                        // compacted segments due to some failure while 
compaction, need to
+                        // reload the original segments in this case.
+                        if (detail(0).getSegmentStatus == 
SegmentStatus.COMPACTED &&
+                            mainTableDetail(0).getSegmentStatus == 
SegmentStatus.SUCCESS) {
+                          detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
+                          LOGGER.error("Added in SILoadFailedSegment " + 
detail(0).getLoadName)
+                          failedLoadMetadataDetails.add(detail(0))
+                        }
                       }
                     })
                   try {
                     if (!failedLoadMetadataDetails.isEmpty) {
+                      // in the case when in SI table a segment is deleted and 
it's entry is
+                      // deleted from the tablestatus file, the corresponding 
.segment file from
+                      // the metadata folder should also be deleted as it 
contains the
+                      // mergefilename which does not exist anymore as the 
segment is deleted.
+                      deleteStaleSegmentFileIfPresent(carbonLoadModel,
+                        indexTable,
+                        failedLoadMetadataDetails)
                       CarbonIndexUtil
                         .LoadToSITable(sparkSession,
                           carbonLoadModel,
@@ -209,14 +231,37 @@ class SILoadEventListenerForFailedSegments extends 
OperationEventListener with L
 
   def checkIfMainTableLoadIsValid(mainTableDetails: Array[LoadMetadataDetails],
     loadName: String): Boolean = {
+    // in concurrent scenarios there can be cases when loadName is not present 
in the
+    // mainTableDetails array. Added a check to see if the loadName is even 
present in the
+    // mainTableDetails.
     val mainTableLoadDetail = mainTableDetails
-      .filter(mainTableDetail => 
mainTableDetail.getLoadName.equals(loadName)).head
-    if (mainTableLoadDetail.getSegmentStatus ==
-        SegmentStatus.MARKED_FOR_DELETE ||
-        mainTableLoadDetail.getSegmentStatus == SegmentStatus.COMPACTED) {
+      .filter(mainTableDetail => mainTableDetail.getLoadName.equals(loadName))
+    if (mainTableLoadDetail.length == 0) {
       false
     } else {
-      true
+      if (mainTableLoadDetail.head.getSegmentStatus ==
+        SegmentStatus.MARKED_FOR_DELETE ||
+        mainTableLoadDetail.head.getSegmentStatus == SegmentStatus.COMPACTED) {
+        false
+      } else {
+        true
+      }
     }
   }
+
+  def deleteStaleSegmentFileIfPresent(carbonLoadModel: CarbonLoadModel, 
indexTable: CarbonTable,
+    failedLoadMetaDataDetails: java.util.List[LoadMetadataDetails]): Unit = {
+    failedLoadMetaDataDetails.asScala.map(failedLoadMetaData => {
+      carbonLoadModel.getLoadMetadataDetails.asScala.map(loadMetaData => {
+        if (failedLoadMetaData.getLoadName == loadMetaData.getLoadName) {
+          val segmentFilePath = 
CarbonTablePath.getSegmentFilesLocation(indexTable.getTablePath) +
+            CarbonCommonConstants.FILE_SEPARATOR + loadMetaData.getSegmentFile
+          if (FileFactory.isFileExist(segmentFilePath)) {
+            // delete the file if it exists
+            FileFactory.deleteFile(segmentFilePath)
+          }
+        }
+      })
+    })
+  }
 }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
index b4066fc..9a16c4a 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -119,6 +120,24 @@ public class CarbonInternalLoaderUtil {
           updatedLoadMetadataDetails.add(currentLoadMetadataDetails[i]);
         }
 
+        // check if newLoadMetadataDetail has segments which are not in  
currentLoadMetaDetails
+        // and add them to the updatedLoadMetadataDetails
+        boolean foundNext = false;
+        for (int i = 0; i < newLoadMetadataDetails.size(); i++) {
+          foundNext = false;
+          for (int j = 0; j < currentLoadMetadataDetails.length; j++) {
+            if 
(newLoadMetadataDetails.get(i).getLoadName().equals(currentLoadMetadataDetails[j].getLoadName()))
 {
+              foundNext = true;
+              break;
+            }
+            if (j == currentLoadMetadataDetails.length - 1 && !foundNext) {
+              // if not found in the list then add it
+              updatedLoadMetadataDetails.add(newLoadMetadataDetails.get(i));
+              found = true;
+            }
+          }
+        }
+
         // when data load is done for first time, add all the details
         if (currentLoadMetadataDetails.length == 0 || !found) {
           updatedLoadMetadataDetails.addAll(newLoadMetadataDetails);
@@ -313,11 +332,21 @@ public class CarbonInternalLoaderUtil {
         getListOfValidSlices(siTableLoadMetadataDetails);
     Collections.sort(mainTableSegmentsList);
     Collections.sort(indexList);
-    if (indexList.size() != mainTableSegmentsList.size()) {
+    // In the case when number of SI segments are more than the maintable 
segments do nothing
+    // and proceed to process the segments. Return False in case if maintable 
segments are more
+    // than SI Segments
+    if (indexList.size() < mainTableSegmentsList.size()) {
       return false;
     }
+    // There can be cases when the number of segments in the main table are 
less than the index
+    // table. In this case mapping all the segments in main table to SI table.
+    // Return False if a segment in maintable is not in indextable
+    HashSet<String> indexTableSet = new HashSet<String>();
     for (int i = 0; i < indexList.size(); i++) {
-      if (!indexList.get(i).equalsIgnoreCase(mainTableSegmentsList.get(i))) {
+      indexTableSet.add(indexList.get(i));
+    }
+    for (int i = 0; i < mainTableSegmentsList.size(); i++) {
+      if (!indexTableSet.contains(mainTableSegmentsList.get(i))) {
         return false;
       }
     }

Reply via email to