QiangCai commented on a change in pull request #3999:
URL: https://github.com/apache/carbondata/pull/3999#discussion_r515768710



##########
File path: 
core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
##########
@@ -101,8 +101,8 @@ public SegmentIndexFileStore(Configuration configuration) {
    * @param segmentPath
    * @throws IOException
    */
-  public void readAllIIndexOfSegment(String segmentPath) throws IOException {
-    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath, 
configuration);
+  public void readAllIIndexOfSegment(String segmentPath, String uuid) throws 
IOException {

Review comment:
       after it added uuid parameter, how about to change the comments or 
method name?

##########
File path: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -342,33 +336,8 @@ object CarbonDataRDDFactory {
 
     try {
       if (!carbonLoadModel.isCarbonTransactionalTable || 
segmentLock.lockWithRetries()) {
-        if (updateModel.isDefined && !updateModel.get.loadAsNewSegment) {
-          res = loadDataFrameForUpdate(
-            sqlContext,
-            dataFrame,
-            carbonLoadModel,
-            updateModel,
-            carbonTable,
-            hadoopConf,
-            segmentMetaDataAccumulator)
-          res.foreach { resultOfSeg =>
-            resultOfSeg.foreach { resultOfBlock =>
-              if (resultOfBlock._2._1.getSegmentStatus == 
SegmentStatus.LOAD_FAILURE) {
-                loadStatus = SegmentStatus.LOAD_FAILURE
-                if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) {
-                  updateModel.get.executorErrors.failureCauses = 
FailureCauses.EXECUTOR_FAILURE
-                  updateModel.get.executorErrors.errorMsg = "Failure in the 
Executor."
-                } else {
-                  updateModel.get.executorErrors = resultOfBlock._2._2
-                }
-              } else if (resultOfBlock._2._1.getSegmentStatus ==
-                         SegmentStatus.LOAD_PARTIAL_SUCCESS) {
-                loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
-                updateModel.get.executorErrors.failureCauses = 
resultOfBlock._2._2.failureCauses
-                updateModel.get.executorErrors.errorMsg = 
resultOfBlock._2._2.errorMsg
-              }
-            }
-          }
+        if (updateModel.isDefined && dataFrame.get.rdd.isEmpty()) {
+          // if the rowtoupdated is empty, do nothing

Review comment:
       rowtoupdated => rowToUpdated

##########
File path: 
processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
##########
@@ -1243,10 +1256,14 @@ public static void 
updateTableStatusInCaseOfFailure(String loadName,
             SegmentStatusManager.readLoadMetadata(metaDataPath);
         boolean ifTableStatusUpdateRequired = false;
         for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
-          if (loadMetadataDetail.getSegmentStatus() == 
SegmentStatus.INSERT_IN_PROGRESS && loadName
-              .equalsIgnoreCase(loadMetadataDetail.getLoadName())) {
+          if ((loadMetadataDetail.getSegmentStatus() == 
SegmentStatus.INSERT_IN_PROGRESS
+              || loadMetadataDetail.getSegmentStatus() == SegmentStatus.SUCCESS

Review comment:
       1. updateTableStatusInCaseOfFailure method should not check 
SUCCESS/LOAD_PARTIAL_SUCCESS segment.
   2. let's keep code simple, better to change the solution.

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
##########
@@ -268,6 +267,11 @@ private[sql] case class CarbonProjectForUpdateCommand(
         // When the table has too many segemnts, it will take a long time.
         // So moving it to the end and it is outside of locking.
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, fileTimestamp)
+
+        // Delete The New Inserted Segment
+        CarbonLoaderUtil.updateTableStatusInCaseOfFailure(

Review comment:
       1. better to get loadName from the insertInto command
   2. update and insert should update tablestatus once at the end.
     if insert success but update not finish or failed, in some time another 
concurrent query will give a wrong result.
     it shouldn't break ACID rule.
   3. refresh index cache for deleted segment only, it should not contain 
updated segment.

##########
File path: 
core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
##########
@@ -276,14 +278,13 @@ public static boolean 
updateTableMetadataStatus(Set<Segment> updatedSegmentsList
                 SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
         for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
+          if (isUpdateStatusFileUpdateRequired &&
+              loadMetadata.getLoadName().equalsIgnoreCase("0")) {
+            loadMetadata.setUpdateStatusFileName(
+                CarbonUpdateUtil.getUpdateStatusFileName(updatedTimeStamp));
+          }
 
           if (isTimestampUpdateRequired) {
-            // we are storing the link between the 2 status files in the 
segment 0 only.

Review comment:
       keep this comment

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
##########
@@ -268,6 +267,11 @@ private[sql] case class CarbonProjectForUpdateCommand(
         // When the table has too many segemnts, it will take a long time.
         // So moving it to the end and it is outside of locking.
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, fileTimestamp)
+
+        // Delete The New Inserted Segment
+        CarbonLoaderUtil.updateTableStatusInCaseOfFailure(

Review comment:
       1. better to get loadName from the insertInto command
   2. update and insert should update tablestatus once at the end.
     if insert success but update not finish or failed, in some time another 
concurrent query will give a wrong result.
     it shouldn't break ACID rule.

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
##########
@@ -268,6 +267,11 @@ private[sql] case class CarbonProjectForUpdateCommand(
         // When the table has too many segemnts, it will take a long time.
         // So moving it to the end and it is outside of locking.
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, fileTimestamp)
+
+        // Delete The New Inserted Segment
+        CarbonLoaderUtil.updateTableStatusInCaseOfFailure(

Review comment:
       better to get loadName from the insertInto command
   

##########
File path: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -1050,14 +752,15 @@ object CarbonDataRDDFactory {
     }
     var done = true
     // If the updated data should be added as new segment then update the 
segment information
-    if (updateModel.isDefined && updateModel.get.loadAsNewSegment) {
+    if (updateModel.isDefined) {

Review comment:
       line 756 and line 766 update tablestatus two times
   how about to merge it to one time

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
##########
@@ -158,8 +159,6 @@ private[sql] case class CarbonProjectForUpdateCommand(
                 "for the update key")
             }
           }
-          // handle the clean up of IUD.
-          CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)

Review comment:
       if we remove it, we will not support the legacy store.
   we need to check it at first(whether is ok or not if we not support the 
legacy store).
   

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
##########
@@ -268,6 +267,11 @@ private[sql] case class CarbonProjectForUpdateCommand(
         // When the table has too many segemnts, it will take a long time.
         // So moving it to the end and it is outside of locking.
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, fileTimestamp)

Review comment:
       pass related segment to the cleanStaleDeltaFiles method

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
##########
@@ -268,6 +267,11 @@ private[sql] case class CarbonProjectForUpdateCommand(
         // When the table has too many segemnts, it will take a long time.

Review comment:
       how about separate update and horizonCompaction.
   if horizonCompaction failed, the update may be already successful.
   

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
##########
@@ -158,8 +159,6 @@ private[sql] case class CarbonProjectForUpdateCommand(
                 "for the update key")
             }
           }
-          // handle the clean up of IUD.
-          CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)

Review comment:
       @ajantha-bhat ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to