kunal642 commented on a change in pull request #4148:
URL: https://github.com/apache/carbondata/pull/4148#discussion_r678009427



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
##########
@@ -198,54 +420,38 @@ case class CarbonMergeDataSetCommand(
       val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, deltaPath)
       val tuple = mutationAction.handleAction(deltaRdd, executorErrors, trxMgr)
       
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath))
-      if (!CarbonUpdateUtil.updateSegmentStatus(tuple._1.asScala.asJava,
-        carbonTable,
-        trxMgr.getLatestTrx.toString, false, false)) {
-        LOGGER.error("writing of update status file failed")
-        throw new CarbonMergeDataSetException("writing of update status file 
failed")
-      }
+      MergeUtil.updateSegmentStatusAfterUpdateOrDelete(targetCarbonTable,
+        trxMgr.getLatestTrx, tuple)
       Some(UpdateTableModel(isUpdate = true, trxMgr.getLatestTrx,
         executorErrors, tuple._2, Option.empty))
     } else {
       None
     }
 
     val dataFrame = loadDF.select(tableCols.map(col): _*)
-    CarbonInsertIntoCommand(databaseNameOp = Some(carbonTable.getDatabaseName),
-      tableName = carbonTable.getTableName,
-      options = Map("fileheader" -> header),
-      isOverwriteTable = false,
-      dataFrame.queryExecution.logical,
-      carbonTable.getTableInfo,
-      Map.empty,
-      Map.empty,
-      new OperationContext,
-      updateTableModel
-    ).run(sparkSession)
+    MergeUtil.insertDataToTargetTable(sparkSession,
+      targetCarbonTable,
+      header,
+      updateTableModel,
+      dataFrame)
 
     if (hasDelAction && count == 0) {
-      val loadMetaDataDetails = 
SegmentStatusManager.readTableStatusFile(CarbonTablePath
-        .getTableStatusFilePath(carbonTable.getTablePath))
-      
CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail
 =>
-        new Segment(loadMetadataDetail.getMergedLoadName,
-          loadMetadataDetail.getSegmentFile)).toSet.asJava,
-        carbonTable,
-        trxMgr.getLatestTrx.toString,
-        true,
-        true, new util.ArrayList[Segment]())
+      MergeUtil.updateStatusIfJustDeleteOperation(targetCarbonTable, 
trxMgr.getLatestTrx)
     }
     LOGGER.info(s"Total inserted rows: ${stats.insertedRows.sum}")
     LOGGER.info(s"Total updated rows: ${stats.updatedRows.sum}")
     LOGGER.info(s"Total deleted rows: ${stats.deletedRows.sum}")
     LOGGER.info(
       " Time taken to merge data  :: " + (System.currentTimeMillis() - st))
 
-  // Load the history table if the insert history table action is added by 
user.
-    HistoryTableLoadHelper.loadHistoryTable(sparkSession, relations.head, 
carbonTable,
+    // Load the history table if the insert history table action is added by 
user.

Review comment:
       check if style is wrong




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to