[CARBONDATA-1972][PARTITION] Fix compaction after update of partition table.

When updation happens on whole data then all old segments needs to be marked as 
delete. But it is not happening in case of partition table. This PR fixes it.

This closes #1752


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cdf2c029
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cdf2c029
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cdf2c029

Branch: refs/heads/branch-1.3
Commit: cdf2c0297f441b1f6070f076d3b67c9acaf86da8
Parents: 3c8031b
Author: ravipesala <[email protected]>
Authored: Wed Jan 3 09:16:36 2018 +0530
Committer: Jacky Li <[email protected]>
Committed: Fri Jan 5 19:36:26 2018 +0800

----------------------------------------------------------------------
 .../hadoop/api/CarbonOutputCommitter.java       | 15 ++++++++----
 .../hadoop/api/CarbonTableOutputFormat.java     |  8 +++++++
 ...andardPartitionTableCompactionTestCase.scala | 15 +++++++++++-
 .../command/carbonTableSchemaCommon.scala       |  3 ++-
 .../management/CarbonLoadDataCommand.scala      |  3 +++
 .../CarbonProjectForDeleteCommand.scala         | 25 ++++++++++----------
 .../CarbonProjectForUpdateCommand.scala         | 10 ++++----
 .../command/mutation/DeleteExecution.scala      | 17 +++++++------
 .../datasources/CarbonFileFormat.scala          | 14 +++++++----
 9 files changed, 75 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 97d5a7f..d0a5fd9 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -18,9 +18,7 @@
 package org.apache.carbondata.hadoop.api;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -100,12 +98,19 @@ public class CarbonOutputCommitter extends 
FileOutputCommitter {
       mergeCarbonIndexFiles(segmentPath);
       String updateTime =
           
context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null);
+      String segmentsToBeDeleted =
+          
context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, 
"");
+      List<String> segmentDeleteList = 
Arrays.asList(segmentsToBeDeleted.split(","));
       if (updateTime != null) {
         Set<String> segmentSet = new HashSet<>(
             new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
                 .getValidAndInvalidSegments().getValidSegments());
-        CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, 
updateTime, true,
-            new ArrayList<String>());
+        CarbonUpdateUtil.updateTableMetadataStatus(
+            segmentSet,
+            carbonTable,
+            updateTime,
+            true,
+            segmentDeleteList);
       }
     } else {
       CarbonLoaderUtil.updateTableStatusForFailure(loadModel);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index bd70e41..897c929 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -95,6 +95,14 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Stri
    */
   public static final String UPADTE_TIMESTAMP = 
"mapreduce.carbontable.update.timestamp";
 
+  /**
+   * During update query we first delete the old data and then add updated 
data to new segment, so
+   * sometimes there is a chance that complete segments needs to removed 
during deletion. We should
+   * do 'Mark for delete' for those segments during table status update.
+   */
+  public static final String SEGMENTS_TO_BE_DELETED =
+      "mapreduce.carbontable.segments.to.be.removed";
+
   private static final Log LOG = 
LogFactory.getLog(CarbonTableOutputFormat.class);
 
   private CarbonOutputCommitter committer;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
index 3e6cd26..298b793 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
@@ -193,7 +193,19 @@ class StandardPartitionTableCompactionTestCase extends 
QueryTest with BeforeAndA
       .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
   }
 
-  override def afterAll = {
+  test("Compaction after update in partition table") {
+    sql("create table compactionupdatepartition (imei 
string,deviceInformationId int,MAC string,deviceColor string,device_backColor 
string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit 
string,CPIClocked string,series string,productionDate timestamp,bomCode 
string,internalModels string, deliveryTime string, channelsId string, 
channelsName string , deliveryAreaId string, deliveryCountry string, 
deliveryProvince string, deliveryCity string,deliveryDistrict string, 
deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, 
ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity 
string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, 
Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion 
string, Active_BacVerNumber string, Active_BacFlashVer string, 
Active_webUIVersion string, Active_webUITypeCarrVer 
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, 
Active_phonePADPa
 rtitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY 
Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country 
string, Latest_province string, Latest_city string, Latest_district string, 
Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, 
Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer 
string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, 
Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, 
Latest_phonePADPartitionedVersions string, Latest_operatorId string, 
gamePointDescription string,gamePointId double) partitioned by(contractNumber 
BigInt) STORED BY 'org.apache.carbondata.format'")
+    sql(s"""LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE 
compactionupdatepartition options ('DELIMITER'=',', 'QUOTECHAR'='', 
'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,
 
Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')""")
+    sql(s"""LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE 
compactionupdatepartition options ('DELIMITER'=',', 'QUOTECHAR'='', 
'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,
 
Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')""")
+    sql(s"""LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE 
compactionupdatepartition options ('DELIMITER'=',', 'QUOTECHAR'='', 
'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,
 
Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')""")
+    sql(s"""LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE 
compactionupdatepartition options ('DELIMITER'=',', 'QUOTECHAR'='', 
'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,
 
Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')""")
+    sql(s"""insert into compactionupdatepartition select * from 
compactionupdatepartition""")
+    sql("update compactionupdatepartition set(AMSize)=('8RAM size')").show()
+    sql("delete from compactionupdatepartition where AMSize ='8RAM 
size'").show()
+    sql(s"""alter table compactionupdatepartition compact 'major'""").collect
+  }
+
+    override def afterAll = {
     dropTable
   }
 
@@ -206,6 +218,7 @@ class StandardPartitionTableCompactionTestCase extends 
QueryTest with BeforeAndA
     sql("drop table if exists partitionmajor")
     sql("drop table if exists staticpartition")
     sql("drop table if exists staticpartitioncompaction")
+    sql("drop table if exists compactionupdatepartition")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index b76cfcf..37eea60 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -132,7 +132,8 @@ case class AlterTableModel(
 case class UpdateTableModel(
     isUpdate: Boolean,
     updatedTimeStamp: Long,
-    var executorErrors: ExecutionErrors)
+    var executorErrors: ExecutionErrors,
+    deletedSegments: Seq[String])
 
 case class CompactionModel(compactionSize: Long,
     compactionType: CompactionType,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 4c1e748..01bb5b3 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -711,6 +711,9 @@ case class CarbonLoadDataCommand(
     options ++= this.options
     if (updateModel.isDefined) {
       options += (("updatetimestamp", 
updateModel.get.updatedTimeStamp.toString))
+      if (updateModel.get.deletedSegments.nonEmpty) {
+        options += (("segmentsToBeDeleted", 
updateModel.get.deletedSegments.mkString(",")))
+      }
     }
     val hdfsRelation = HadoopFsRelation(
       location = catalog,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index e54ea9e..7022566 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -70,28 +70,27 @@ private[sql] case class CarbonProjectForDeleteCommand(
       // handle the clean up of IUD.
       CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
-      if (DeleteExecution.deleteDeltaExecution(
+      DeleteExecution.deleteDeltaExecution(
         databaseNameOp,
         tableName,
         sparkSession,
         dataRdd,
         timestamp,
         isUpdateOperation = false,
-        executorErrors)) {
-        // call IUD Compaction.
-        HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
-          isUpdateOperation = false)
+        executorErrors)
+      // call IUD Compaction.
+      HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
+        isUpdateOperation = false)
 
 
-        if (executorErrors.failureCauses != FailureCauses.NONE) {
-          throw new Exception(executorErrors.errorMsg)
-        }
-
-        // trigger post event for Delete from table
-        val deleteFromTablePostEvent: DeleteFromTablePostEvent =
-          DeleteFromTablePostEvent(sparkSession, carbonTable)
-        OperationListenerBus.getInstance.fireEvent(deleteFromTablePostEvent, 
operationContext)
+      if (executorErrors.failureCauses != FailureCauses.NONE) {
+        throw new Exception(executorErrors.errorMsg)
       }
+
+      // trigger post event for Delete from table
+      val deleteFromTablePostEvent: DeleteFromTablePostEvent =
+        DeleteFromTablePostEvent(sparkSession, carbonTable)
+      OperationListenerBus.getInstance.fireEvent(deleteFromTablePostEvent, 
operationContext)
     } catch {
       case e: HorizontalCompactionException =>
         LOGGER.error("Delete operation passed. Exception in Horizontal 
Compaction." +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 75008ad..bd53a66 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -91,7 +91,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
       CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
       // do delete operation.
-      DeleteExecution.deleteDeltaExecution(
+      val segmentsToBeDeleted = DeleteExecution.deleteDeltaExecution(
         databaseNameOp,
         tableName,
         sparkSession,
@@ -111,7 +111,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
         plan,
         sparkSession,
         currentTime,
-        executionErrors)
+        executionErrors,
+        segmentsToBeDeleted)
 
       if (executionErrors.failureCauses != FailureCauses.NONE) {
         throw new Exception(executionErrors.errorMsg)
@@ -165,7 +166,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
       plan: LogicalPlan,
       sparkSession: SparkSession,
       currentTime: Long,
-      executorErrors: ExecutionErrors): Unit = {
+      executorErrors: ExecutionErrors,
+      deletedSegments: Seq[String]): Unit = {
 
     def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): 
Boolean = {
       val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
@@ -209,7 +211,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
       case _ => sys.error("")
     }
 
-    val updateTableModel = UpdateTableModel(true, currentTime, executorErrors)
+    val updateTableModel = UpdateTableModel(true, currentTime, executorErrors, 
deletedSegments)
 
     val header = getHeader(carbonRelation, plan)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 00d6657..994880c 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -51,6 +51,10 @@ import org.apache.carbondata.spark.DeleteDelataResultImpl
 object DeleteExecution {
   val LOGGER: LogService = 
LogServiceFactory.getLogService(this.getClass.getName)
 
+  /**
+   * generate the delete delta files in each segment as per the RDD.
+   * @return it gives the segments which needs to be deleted.
+   */
   def deleteDeltaExecution(
       databaseNameOp: Option[String],
       tableName: String,
@@ -58,7 +62,7 @@ object DeleteExecution {
       dataRdd: RDD[Row],
       timestamp: String,
       isUpdateOperation: Boolean,
-      executorErrors: ExecutionErrors): Boolean = {
+      executorErrors: ExecutionErrors): Seq[String] = {
 
     var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, 
ExecutionErrors))]] = null
     val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
@@ -66,8 +70,8 @@ object DeleteExecution {
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
     val factPath = carbonTablePath.getFactDir
+    var segmentsTobeDeleted = Seq.empty[String]
 
-    var deleteStatus = true
     val deleteRdd = if (isUpdateOperation) {
       val schema =
         
org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField(
@@ -92,7 +96,7 @@ object DeleteExecution {
 
     // if no loads are present then no need to do anything.
     if (keyRdd.partitions.length == 0) {
-      return true
+      return segmentsTobeDeleted
     }
     val blockMappingVO =
       carbonInputFormat.getBlockRowCount(
@@ -132,7 +136,7 @@ object DeleteExecution {
 
     // if no loads are present then no need to do anything.
     if (res.isEmpty) {
-      return true
+      return segmentsTobeDeleted
     }
 
     // update new status file
@@ -154,7 +158,6 @@ object DeleteExecution {
             }
           }
           else {
-            deleteStatus = false
             // In case of failure , clean all related delete delta files
             CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
             LOGGER.audit(s"Delete data operation is failed for ${ database 
}.${ tableName }")
@@ -179,7 +182,7 @@ object DeleteExecution {
       val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil
         
.getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping)
 
-
+      segmentsTobeDeleted = listOfSegmentToBeMarkedDeleted.asScala
 
       // this is delete flow so no need of putting timestamp in the status 
file.
       if (CarbonUpdateUtil
@@ -310,7 +313,7 @@ object DeleteExecution {
       resultIter
     }
 
-    true
+    segmentsTobeDeleted
   }
 
   private def createCarbonInputFormat(absoluteTableIdentifier: 
AbsoluteTableIdentifier) :

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdf2c029/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 7f05cef..15deae1 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -109,15 +109,21 @@ with Serializable {
     CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
     // Set the update timestamp if user sets in case of update query. It needs 
to be updated
     // in load status update time
-    val updateTimeStamp = options.getOrElse("updatetimestamp", null)
-    if (updateTimeStamp != null) {
-      conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp)
-      model.setFactTimeStamp(updateTimeStamp.toLong)
+    val updateTimeStamp = options.get("updatetimestamp")
+    if (updateTimeStamp.isDefined) {
+      conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
+      model.setFactTimeStamp(updateTimeStamp.get.toLong)
     }
     val staticPartition = options.getOrElse("staticpartition", null)
     if (staticPartition != null) {
       conf.set("carbon.staticpartition", staticPartition)
     }
+    // In case of update query there is chance to remove the older segments, 
so here we can set
+    // the to be deleted segments to mark as delete while updating tablestatus
+    val segemntsTobeDeleted = options.get("segmentsToBeDeleted")
+    if (segemntsTobeDeleted.isDefined) {
+      conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, 
segemntsTobeDeleted.get)
+    }
     CarbonTableOutputFormat.setLoadModel(conf, model)
 
     new OutputWriterFactory {

Reply via email to