akashrn5 commented on a change in pull request #4015:
URL: https://github.com/apache/carbondata/pull/4015#discussion_r532701657



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -299,6 +300,13 @@ object CarbonIndexUtil {
         .Map((carbonLoadModel.getSegmentId, carbonLoadModel.getFactTimeStamp))
     }
     val header = 
indexTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
+    if (isInsertOverWrite) {
+      val loadMetadataDetails = carbonLoadModel.getLoadMetadataDetails.asScala
+      for (loadMetadata <- loadMetadataDetails) {

Review comment:
       do not use tradittional way of looping, using scala functional way, 
.foreach

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
##########
@@ -79,13 +78,16 @@ class SILoadEventListener extends OperationEventListener 
with Logging {
                   .lookupRelation(Some(carbonLoadModel.getDatabaseName),
                     
indexTableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
 
+                val isInsertOverwrite = (operationContext.getProperties

Review comment:
       remove unnecessary brackets

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
##########
@@ -385,7 +392,26 @@ object SecondaryIndexCreator {
         val rebuiltSegments = SecondaryIndexUtil
           
.mergeDataFilesSISegments(secondaryIndexModel.segmentIdToLoadStartTimeMapping,
             indexCarbonTable,
-            loadMetadataDetails.toList.asJava, 
carbonLoadModelForMergeDataFiles)(sc)
+            loadMetadataDetail.toList.asJava, 
carbonLoadModelForMergeDataFiles)(sc)
+        if (isInsertOverwrite) {
+          var segmentList = new ListBuffer[String]()

Review comment:
       rename to more meaningful like what this list does, or required?

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
##########
@@ -385,7 +392,26 @@ object SecondaryIndexCreator {
         val rebuiltSegments = SecondaryIndexUtil
           
.mergeDataFilesSISegments(secondaryIndexModel.segmentIdToLoadStartTimeMapping,
             indexCarbonTable,
-            loadMetadataDetails.toList.asJava, 
carbonLoadModelForMergeDataFiles)(sc)
+            loadMetadataDetail.toList.asJava, 
carbonLoadModelForMergeDataFiles)(sc)
+        if (isInsertOverwrite) {
+          var segmentList = new ListBuffer[String]()
+          for (loadMetadata <- loadMetadataDetails) {
+            if (loadMetadata.getSegmentStatus != 
SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
+              segmentList += loadMetadata.getLoadName
+            }
+          }
+          if (segmentList.nonEmpty) {

Review comment:
       add a comment 

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
##########
@@ -385,7 +392,26 @@ object SecondaryIndexCreator {
         val rebuiltSegments = SecondaryIndexUtil
           
.mergeDataFilesSISegments(secondaryIndexModel.segmentIdToLoadStartTimeMapping,
             indexCarbonTable,
-            loadMetadataDetails.toList.asJava, 
carbonLoadModelForMergeDataFiles)(sc)
+            loadMetadataDetail.toList.asJava, 
carbonLoadModelForMergeDataFiles)(sc)
+        if (isInsertOverwrite) {
+          var segmentList = new ListBuffer[String]()
+          for (loadMetadata <- loadMetadataDetails) {

Review comment:
       same as above, use foreach

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
##########
@@ -371,11 +377,12 @@ object SecondaryIndexCreator {
 
         val loadMetadataDetails = SegmentStatusManager
           .readLoadMetadata(indexCarbonTable.getMetadataPath)
-          .filter(loadMetadataDetail => 
successSISegments.contains(loadMetadataDetail.getLoadName))
 
+        val loadMetadataDetail = loadMetadataDetails

Review comment:
       better to rename it, else it will be confusing

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
##########
@@ -385,7 +392,26 @@ object SecondaryIndexCreator {
         val rebuiltSegments = SecondaryIndexUtil
           
.mergeDataFilesSISegments(secondaryIndexModel.segmentIdToLoadStartTimeMapping,
             indexCarbonTable,
-            loadMetadataDetails.toList.asJava, 
carbonLoadModelForMergeDataFiles)(sc)
+            loadMetadataDetail.toList.asJava, 
carbonLoadModelForMergeDataFiles)(sc)
+        if (isInsertOverwrite) {
+          var segmentList = new ListBuffer[String]()
+          for (loadMetadata <- loadMetadataDetails) {
+            if (loadMetadata.getSegmentStatus != 
SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
+              segmentList += loadMetadata.getLoadName
+            }
+          }
+          if (segmentList.nonEmpty) {

Review comment:
       please change the status to marked for delete after making the new 
insert overwritten segment to success. else we may get issues of reliability in 
concurrent scenarios




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to