Repository: carbondata
Updated Branches:
  refs/heads/master f1c6dddec -> a89587e70


[CARBONDATA-1908][PARTITION] Support UPDATE/DELETE on partition tables.

This closes #1681


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a89587e7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a89587e7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a89587e7

Branch: refs/heads/master
Commit: a89587e70dddd55b44050423810776edaad26f0b
Parents: f1c6ddd
Author: ravipesala <[email protected]>
Authored: Tue Dec 19 17:43:00 2017 +0530
Committer: Venkata Ramana G <[email protected]>
Committed: Thu Dec 21 16:39:09 2017 +0530

----------------------------------------------------------------------
 .../hadoop/api/CarbonOutputCommitter.java       |  30 +++-
 .../hadoop/api/CarbonTableInputFormat.java      |   3 +-
 .../hadoop/api/CarbonTableOutputFormat.java     |  13 +-
 .../iud/DeleteCarbonTableTestCase.scala         |  21 +++
 .../iud/UpdateCarbonTableTestCase.scala         |  68 +++++++++
 .../carbondata/spark/util/CommonUtil.scala      |   2 +-
 .../management/CarbonLoadDataCommand.scala      |  23 ++-
 ...rbonAlterTableDropHivePartitionCommand.scala | 150 +++++++++++++++++++
 .../CarbonStandardAlterTableDropPartition.scala | 150 -------------------
 .../datasources/CarbonFileFormat.scala          |   9 +-
 .../sql/execution/strategy/DDLStrategy.scala    |   4 +-
 .../loading/CarbonDataLoadConfiguration.java    |  11 ++
 .../loading/DataLoadProcessBuilder.java         |   6 +-
 .../iterator/CarbonOutputIteratorWrapper.java   |  61 +++++---
 .../store/CarbonFactDataHandlerColumnar.java    |   4 +
 .../store/CarbonFactDataHandlerModel.java       |   8 +
 .../processing/util/CarbonLoaderUtil.java       |  14 +-
 17 files changed, 384 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 08fd1ac..6f5d0e4 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -18,12 +18,18 @@
 package org.apache.carbondata.hadoop.api;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.metadata.PartitionMapFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -76,18 +82,32 @@ public class CarbonOutputCommitter extends 
FileOutputCommitter {
     super.commitJob(context);
     boolean overwriteSet = 
CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
     CarbonLoadModel loadModel = 
CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
+    LoadMetadataDetails newMetaEntry = 
loadModel.getCurrentLoadMetadataDetail();
     String segmentPath =
         CarbonTablePath.getSegmentPath(loadModel.getTablePath(), 
loadModel.getSegmentId());
     // Merge all partition files into a single file.
     new PartitionMapFileStore().mergePartitionMapFiles(segmentPath,
         loadModel.getFactTimeStamp() + "");
-    LoadMetadataDetails newMetaEntry = 
loadModel.getCurrentLoadMetadataDetail();
     CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, 
SegmentStatus.SUCCESS,
         loadModel.getFactTimeStamp(), true);
-    CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, 
loadModel.getSegmentId(),
-        loadModel.getCarbonDataLoadSchema().getCarbonTable());
-    CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, 
overwriteSet);
-    new 
CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath);
+    CarbonTable carbonTable = 
loadModel.getCarbonDataLoadSchema().getCarbonTable();
+    long segmentSize = CarbonLoaderUtil
+        .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), 
carbonTable);
+    if (segmentSize > 0) {
+      CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, 
overwriteSet);
+      new 
CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath);
+      String updateTime =
+          
context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null);
+      if (updateTime != null) {
+        Set<String> segmentSet = new HashSet<>(
+            new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
+                .getValidAndInvalidSegments().getValidSegments());
+        CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, 
updateTime, true,
+            new ArrayList<String>());
+      }
+    } else {
+      CarbonLoaderUtil.updateTableStatusForFailure(loadModel);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 6a2349a..22eca9f 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -128,6 +128,8 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
   public static final String TABLE_NAME = 
"mapreduce.input.carboninputformat.tableName";
   public static final String PARTITIONS_TO_PRUNE =
       "mapreduce.input.carboninputformat.partitions.to.prune";
+  public static final String UPADTE_T =
+      "mapreduce.input.carboninputformat.partitions.to.prune";
 
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
@@ -915,7 +917,6 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
    * @param identifier
    * @return
    * @throws IOException
-   * @throws KeyGenException
    */
   public BlockMappingVO getBlockRowCount(Job job, AbsoluteTableIdentifier 
identifier,
       List<String> partitions) throws IOException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index f11cb35..bd70e41 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -40,6 +40,8 @@ import 
org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWra
 import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
@@ -87,6 +89,13 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Stri
       "mapreduce.carbontable.dict.server.host";
   public static final String DICTIONARY_SERVER_PORT =
       "mapreduce.carbontable.dict.server.port";
+  /**
+   * Set the update timestamp if user sets in case of update query. It needs 
to be updated
+   * in load status update time
+   */
+  public static final String UPADTE_TIMESTAMP = 
"mapreduce.carbontable.update.timestamp";
+
+  private static final Log LOG = 
LogFactory.getLog(CarbonTableOutputFormat.class);
 
   private CarbonOutputCommitter committer;
 
@@ -373,15 +382,17 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Stri
     }
 
     @Override public void close(TaskAttemptContext taskAttemptContext) throws 
InterruptedException {
-      iteratorWrapper.close();
+      iteratorWrapper.closeWriter();
       try {
         future.get();
       } catch (ExecutionException e) {
+        LOG.error("Error while loading data", e);
         throw new InterruptedException(e.getMessage());
       } finally {
         executorService.shutdownNow();
         dataLoadExecutor.close();
       }
+      LOG.info("Closed partition writer task " + 
taskAttemptContext.getTaskAttemptID());
     }
 
     public CarbonLoadModel getLoadModel() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index efbe807..deaad20 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -104,6 +104,27 @@ class DeleteCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
     )
   }
 
+  test("partition delete data from carbon table with alias [where clause ]") {
+    sql("drop table if exists iud_db.dest")
+    sql("""create table iud_db.dest (c1 string,c2 int,c5 string) PARTITIONED 
BY(c3 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud_db.dest""")
+    sql("""delete from iud_db.dest d where d.c1 = 'a'""").show
+    checkAnswer(
+      sql("""select c2 from iud_db.dest"""),
+      Seq(Row(2), Row(3),Row(4), Row(5))
+    )
+  }
+  test("partition delete data from  carbon table[where clause ]") {
+    sql("""drop table if exists iud_db.dest""")
+    sql("""create table iud_db.dest (c1 string,c2 int,c5 string) PARTITIONED 
BY(c3 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud_db.dest""")
+    sql("""delete from iud_db.dest where c2 = 2""").show
+    checkAnswer(
+      sql("""select c1 from iud_db.dest"""),
+      Seq(Row("a"), Row("c"), Row("d"), Row("e"))
+    )
+  }
+
   test("Records more than one pagesize after delete operation ") {
     sql("DROP TABLE IF EXISTS carbon2")
     import sqlContext.implicits._

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index f265e75..25a4999 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -569,6 +569,74 @@ class UpdateCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
         CarbonCommonConstants.defaultValueIsPersistEnabled)
   }
 
+  test("partition test update operation with 0 rows updation.") {
+    sql("""drop table if exists iud.zerorows_part""").show
+    sql("""create table iud.zerorows_part (c1 string,c2 int,c5 string) 
PARTITIONED BY(c3 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.zerorows_part""")
+    sql("""update iud.zerorows_part d  set (d.c2) = (d.c2 + 1) where d.c1 = 
'a'""").show()
+    sql("""update iud.zerorows_part d  set (d.c2) = (d.c2 + 1) where d.c1 = 
'xxx'""").show()
+    checkAnswer(
+      sql("""select c1,c2,c3,c5 from iud.zerorows_part"""),
+      
Seq(Row("a",2,"aa","aaa"),Row("b",2,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"))
+    )
+    sql("""drop table iud.zerorows_part""").show
+
+  }
+
+
+  test("partition update carbon table[select from source table with where and 
exist]") {
+    sql("""drop table if exists iud.dest11_part""").show
+    sql("""create table iud.dest11_part (c1 string,c2 int,c5 string) 
PARTITIONED BY(c3 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest11_part""")
+    sql("""update iud.dest11_part d set (d.c3, d.c5 ) = (select s.c33,s.c55 
from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show()
+    checkAnswer(
+      sql("""select c3,c5 from iud.dest11_part"""),
+      Seq(Row("cc","ccc"), Row("dd","ddd"),Row("ee","eee"), 
Row("MGM","Disco"),Row("RGK","Music"))
+    )
+    sql("""drop table iud.dest11_part""").show
+  }
+
+  test("partition update carbon table[using destination table columns with 
where and exist]") {
+    sql("""drop table if exists iud.dest22_part""")
+    sql("""create table iud.dest22_part (c1 string,c2 int,c5 string) 
PARTITIONED BY(c3 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest22_part""")
+    checkAnswer(
+      sql("""select c2 from iud.dest22_part where c1='a'"""),
+      Seq(Row(1))
+    )
+    sql("""update iud.dest22_part d  set (d.c2) = (d.c2 + 1) where d.c1 = 
'a'""").show()
+    checkAnswer(
+      sql("""select c2 from iud.dest22_part where c1='a'"""),
+      Seq(Row(2))
+    )
+    sql("""drop table if exists iud.dest22_part""")
+  }
+
+  test("partition update carbon table without alias in set columns") {
+    sql("""drop table if exists iud.dest33_part""")
+    sql("""create table iud.dest33_part (c2 int,c3 string,c5 string) 
PARTITIONED BY(c1 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest33_part""")
+    sql("""update iud.dest33_part d set (c3,c5 ) = (select s.c33 ,s.c55  from 
iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
+    checkAnswer(
+      sql("""select c3,c5 from iud.dest33_part where c1='a'"""),
+      Seq(Row("MGM","Disco"))
+    )
+    sql("""drop table if exists iud.dest33_part""")
+  }
+
+  test("partition update carbon table without alias in set columns with 
mulitple loads") {
+    sql("""drop table if exists iud.dest33_part""")
+    sql("""create table iud.dest33_part (c1 string,c2 int,c5 string) 
PARTITIONED BY(c3 string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest33_part""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table 
iud.dest33_part""")
+    sql("""update iud.dest33_part d set (c3,c5 ) = (select s.c33 ,s.c55  from 
iud.source2 s where d.c1 = s.c11) where d.c1 = 'a'""").show()
+    checkAnswer(
+      sql("""select c3,c5 from iud.dest33_part where c1='a'"""),
+      Seq(Row("MGM","Disco"),Row("MGM","Disco"))
+    )
+    sql("""drop table if exists iud.dest33_part""")
+  }
+
   override def afterAll {
     sql("use default")
     sql("drop database  if exists iud cascade")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index ae30300..dba8f0e 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -639,7 +639,7 @@ object CommonUtil {
   def readLoadMetadataDetails(model: CarbonLoadModel): Unit = {
     val metadataPath = 
model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath
     val details = SegmentStatusManager.readLoadMetadata(metadataPath)
-    model.setLoadMetadataDetails(details.toList.asJava)
+    model.setLoadMetadataDetails(new 
util.ArrayList[LoadMetadataDetails](details.toList.asJava))
   }
 
   def configureCSVInputFormat(configuration: Configuration,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 69be362..7492951 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -38,6 +38,7 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, 
LogicalPlan, Project}
 import org.apache.spark.sql.execution.LogicalRDD
+import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
 import org.apache.spark.sql.execution.command.{DataCommand, 
DataLoadTableFileMapping, UpdateTableModel}
 import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, 
CatalogFileIndex, HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.hive.CarbonRelation
@@ -490,7 +491,24 @@ case class CarbonLoadDataCommand(
         }
         InternalRow.fromSeq(data)
       }
-      LogicalRDD(attributes, rdd)(sparkSession)
+      if (updateModel.isDefined) {
+        sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
+        // In case of update, we don't need the segmrntid column in case of 
partitioning
+        val dropAttributes = attributes.dropRight(1)
+        val finalOutput = relation.output.map { attr =>
+          dropAttributes.find { d =>
+            val index = d.name.lastIndexOf("-updatedColumn")
+            if (index > 0) {
+              d.name.substring(0, index).equalsIgnoreCase(attr.name)
+            } else {
+              d.name.equalsIgnoreCase(attr.name)
+            }
+          }.get
+        }
+        Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession))
+      } else {
+        LogicalRDD(attributes, rdd)(sparkSession)
+      }
 
     } else {
       var timeStampformatString = carbonLoadModel.getTimestampformat
@@ -618,6 +636,9 @@ case class CarbonLoadDataCommand(
     options += (("onepass", loadModel.getUseOnePass.toString))
     options += (("dicthost", loadModel.getDictionaryServerHost))
     options += (("dictport", loadModel.getDictionaryServerPort.toString))
+    if (updateModel.isDefined) {
+      options += (("updatetimestamp", 
updateModel.get.updatedTimeStamp.toString))
+    }
     val hdfsRelation = HadoopFsRelation(
       location = catalog,
       partitionSchema = partitionSchema,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
new file mode 100644
index 0000000..c68e43c
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.partition
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, 
AlterTableDropPartitionCommand, AtomicRunnableCommand}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.spark.rdd.{CarbonDropPartitionCommitRDD, 
CarbonDropPartitionRDD}
+
+/**
+ * Drop the partitions from hive and carbon store. It drops the partitions in 
following steps
+ * 1. Drop the partitions from carbon store, it just create one new mapper 
file in each segment
+ * with uniqueid.
+ * 2. Drop partitions from hive.
+ * 3. In any above step fails then roll back the newly created files
+ * 4. After success of steps 1 and 2 , it commits the files by removing the 
old fails.
+ * Here it does not remove any data from store. During compaction the old data 
won't be considered.
+ * @param tableName
+ * @param specs
+ * @param ifExists
+ * @param purge
+ * @param retainData
+ */
+case class CarbonAlterTableDropHivePartitionCommand(
+    tableName: TableIdentifier,
+    specs: Seq[TablePartitionSpec],
+    ifExists: Boolean,
+    purge: Boolean,
+    retainData: Boolean)
+  extends AtomicRunnableCommand {
+
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+    if (table.isHivePartitionTable) {
+      try {
+        specs.flatMap(f => 
sparkSession.sessionState.catalog.listPartitions(tableName, Some(f)))
+      } catch {
+        case e: Exception =>
+          if (!ifExists) {
+            throw e
+          } else {
+            log.warn(e.getMessage)
+            return Seq.empty[Row]
+          }
+      }
+
+      // Drop the partitions from hive.
+      AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, 
retainData)
+        .run(sparkSession)
+    }
+    Seq.empty[Row]
+  }
+
+
+  override def undoMetadata(sparkSession: SparkSession, exception: Exception): 
Seq[Row] = {
+    AlterTableAddPartitionCommand(tableName, specs.map((_, None)), ifExists)
+    val msg = s"Got exception $exception when processing data of drop 
partition." +
+              "Adding back partitions to the metadata"
+    LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg)
+    Seq.empty[Row]
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+    var locks = List.empty[ICarbonLock]
+    val uniqueId = System.currentTimeMillis().toString
+    try {
+      val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+        LockUsage.COMPACTION_LOCK,
+        LockUsage.DELETE_SEGMENT_LOCK,
+        LockUsage.DROP_TABLE_LOCK,
+        LockUsage.CLEAN_FILES_LOCK,
+        LockUsage.ALTER_PARTITION_LOCK)
+      locks = AlterTableUtil.validateTableAndAcquireLock(
+        table.getDatabaseName,
+        table.getTableName,
+        locksToBeAcquired)(sparkSession)
+      val partitionNames = specs.flatMap { f =>
+        f.map(k => k._1 + "=" + k._2)
+      }.toSet
+      val segments = new SegmentStatusManager(table.getAbsoluteTableIdentifier)
+        .getValidAndInvalidSegments.getValidSegments
+      try {
+        // First drop the partitions from partition mapper files of each 
segment
+        new CarbonDropPartitionRDD(sparkSession.sparkContext,
+          table.getTablePath,
+          segments.asScala,
+          partitionNames.toSeq,
+          uniqueId).collect()
+      } catch {
+        case e: Exception =>
+          // roll back the drop partitions from carbon store
+          new CarbonDropPartitionCommitRDD(sparkSession.sparkContext,
+            table.getTablePath,
+            segments.asScala,
+            false,
+            uniqueId).collect()
+          throw e
+      }
+      // commit the drop partitions from carbon store
+      new CarbonDropPartitionCommitRDD(sparkSession.sparkContext,
+        table.getTablePath,
+        segments.asScala,
+        true,
+        uniqueId).collect()
+      // Update the loadstatus with update time to clear cache from driver.
+      val segmentSet = new util.HashSet[String](new SegmentStatusManager(table
+        
.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments)
+      CarbonUpdateUtil.updateTableMetadataStatus(
+        segmentSet,
+        table,
+        uniqueId,
+        true,
+        new util.ArrayList[String])
+      
DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
+    } finally {
+      AlterTableUtil.releaseLocks(locks)
+    }
+    Seq.empty[Row]
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala
deleted file mode 100644
index fd59587..0000000
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.partition
-
-import java.util
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, 
AlterTableDropPartitionCommand, AtomicRunnableCommand}
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.spark.rdd.{CarbonDropPartitionCommitRDD, 
CarbonDropPartitionRDD}
-
-/**
- * Drop the partitions from hive and carbon store. It drops the partitions in 
following steps
- * 1. Drop the partitions from carbon store, it just create one new mapper 
file in each segment
- * with uniqueid.
- * 2. Drop partitions from hive.
- * 3. In any above step fails then roll back the newly created files
- * 4. After success of steps 1 and 2 , it commits the files by removing the 
old fails.
- * Here it does not remove any data from store. During compaction the old data 
won't be considered.
- * @param tableName
- * @param specs
- * @param ifExists
- * @param purge
- * @param retainData
- */
-case class CarbonStandardAlterTableDropPartition(
-    tableName: TableIdentifier,
-    specs: Seq[TablePartitionSpec],
-    ifExists: Boolean,
-    purge: Boolean,
-    retainData: Boolean)
-  extends AtomicRunnableCommand {
-
-
-  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
-    if (table.isHivePartitionTable) {
-      try {
-        specs.flatMap(f => 
sparkSession.sessionState.catalog.listPartitions(tableName, Some(f)))
-      } catch {
-        case e: Exception =>
-          if (!ifExists) {
-            throw e
-          } else {
-            log.warn(e.getMessage)
-            return Seq.empty[Row]
-          }
-      }
-
-      // Drop the partitions from hive.
-      AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, 
retainData)
-        .run(sparkSession)
-    }
-    Seq.empty[Row]
-  }
-
-
-  override def undoMetadata(sparkSession: SparkSession, exception: Exception): 
Seq[Row] = {
-    AlterTableAddPartitionCommand(tableName, specs.map((_, None)), ifExists)
-    val msg = s"Got exception $exception when processing data of drop 
partition." +
-              "Adding back partitions to the metadata"
-    LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg)
-    Seq.empty[Row]
-  }
-
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
-    var locks = List.empty[ICarbonLock]
-    val uniqueId = System.currentTimeMillis().toString
-    try {
-      val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
-        LockUsage.COMPACTION_LOCK,
-        LockUsage.DELETE_SEGMENT_LOCK,
-        LockUsage.DROP_TABLE_LOCK,
-        LockUsage.CLEAN_FILES_LOCK,
-        LockUsage.ALTER_PARTITION_LOCK)
-      locks = AlterTableUtil.validateTableAndAcquireLock(
-        table.getDatabaseName,
-        table.getTableName,
-        locksToBeAcquired)(sparkSession)
-      val partitionNames = specs.flatMap { f =>
-        f.map(k => k._1 + "=" + k._2)
-      }.toSet
-      val segments = new SegmentStatusManager(table.getAbsoluteTableIdentifier)
-        .getValidAndInvalidSegments.getValidSegments
-      try {
-        // First drop the partitions from partition mapper files of each 
segment
-        new CarbonDropPartitionRDD(sparkSession.sparkContext,
-          table.getTablePath,
-          segments.asScala,
-          partitionNames.toSeq,
-          uniqueId).collect()
-      } catch {
-        case e: Exception =>
-          // roll back the drop partitions from carbon store
-          new CarbonDropPartitionCommitRDD(sparkSession.sparkContext,
-            table.getTablePath,
-            segments.asScala,
-            false,
-            uniqueId).collect()
-          throw e
-      }
-      // commit the drop partitions from carbon store
-      new CarbonDropPartitionCommitRDD(sparkSession.sparkContext,
-        table.getTablePath,
-        segments.asScala,
-        true,
-        uniqueId).collect()
-      // Update the loadstatus with update time to clear cache from driver.
-      val segmentSet = new util.HashSet[String](new SegmentStatusManager(table
-        
.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments)
-      CarbonUpdateUtil.updateTableMetadataStatus(
-        segmentSet,
-        table,
-        uniqueId,
-        true,
-        new util.ArrayList[String])
-      
DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
-    } finally {
-      AlterTableUtil.releaseLocks(locks)
-    }
-    Seq.empty[Row]
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 3eed726..a95693c 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -104,8 +104,15 @@ with Serializable {
     model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
     model.setDictionaryServerHost(options.getOrElse("dicthost", null))
     model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
-    CarbonTableOutputFormat.setLoadModel(conf, model)
     CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
+    // Set the update timestamp if user sets in case of update query. It needs 
to be updated
+    // in load status update time
+    val updateTimeStamp = options.getOrElse("updatetimestamp", null)
+    if (updateTimeStamp != null) {
+      conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp)
+      model.setFactTimeStamp(updateTimeStamp.toLong)
+    }
+    CarbonTableOutputFormat.setLoadModel(conf, model)
 
     new OutputWriterFactory {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index eeadbf6..89fcfd2 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 import org.apache.spark.sql.execution.command._
 import 
org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand,
 CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand}
-import 
org.apache.spark.sql.execution.command.partition.{CarbonShowCarbonPartitionsCommand,
 CarbonStandardAlterTableDropPartition}
+import 
org.apache.spark.sql.execution.command.partition.{CarbonAlterTableDropHivePartitionCommand,
 CarbonShowCarbonPartitionsCommand}
 import org.apache.spark.sql.execution.command.schema._
 import 
org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedCommand, 
CarbonDropTableCommand}
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, 
CarbonResetCommand, CarbonSetCommand}
@@ -174,7 +174,7 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
           .tableExists(tableName)(sparkSession)
         if (isCarbonTable) {
           ExecutedCommandExec(
-            CarbonStandardAlterTableDropPartition(
+            CarbonAlterTableDropHivePartitionCommand(
               tableName,
               specs,
               ifExists,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 19c8f03..7b1ab9d 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -104,6 +104,11 @@ public class CarbonDataLoadConfiguration {
   // contains metadata used in write step of loading process
   private TableSpec tableSpec;
 
+  /**
+   * Number of thread cores to use while writing data files
+   */
+  private short writingCoresCount;
+
   public CarbonDataLoadConfiguration() {
   }
 
@@ -351,5 +356,11 @@ public class CarbonDataLoadConfiguration {
     this.tableSpec = tableSpec;
   }
 
+  public short getWritingCoresCount() {
+    return writingCoresCount;
+  }
 
+  public void setWritingCoresCount(short writingCoresCount) {
+    this.writingCoresCount = writingCoresCount;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 787cb7b..87d5b97 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -224,7 +224,11 @@ public final class DataLoadProcessBuilder {
     configuration.setPreFetch(loadModel.isPreFetch());
     configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
     
configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
-
+    // For partition loading always use single core as it already runs in 
multiple
+    // threads per partition
+    if (carbonTable.isHivePartitionTable()) {
+      configuration.setWritingCoresCount((short) 1);
+    }
     TableSpec tableSpec = new TableSpec(dimensions, measures);
     configuration.setTableSpec(tableSpec);
     return configuration;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
index 08d1497..66943c8 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
@@ -18,15 +18,21 @@
 package org.apache.carbondata.processing.loading.iterator;
 
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.CarbonIterator;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 /**
  * It is wrapper class to hold the rows in batches when record writer writes 
the data and allows
  * to iterate on it during data load. It uses blocking queue to coordinate 
between read and write.
  */
 public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
 
+  private static final Log LOG = 
LogFactory.getLog(CarbonOutputIteratorWrapper.class);
+
   private boolean close = false;
 
   /**
@@ -50,34 +56,48 @@ public class CarbonOutputIteratorWrapper extends 
CarbonIterator<String[]> {
 
   @Override
   public boolean hasNext() {
-    return !queue.isEmpty() || !close || readBatch != null && 
readBatch.hasNext();
-  }
-
-  @Override
-  public String[] next() {
-    if (readBatch == null || !readBatch.hasNext()  && !close) {
+    if (readBatch == null || !readBatch.hasNext()) {
       try {
-        readBatch = queue.take();
+        if (!close) {
+          readBatch = queue.poll(5, TimeUnit.MINUTES);
+          if (readBatch == null) {
+            LOG.warn("This scenario should not happen");
+            return false;
+          }
+        } else {
+          readBatch = queue.poll();
+          if (readBatch == null) {
+            return false;
+          }
+        }
       } catch (InterruptedException e) {
         throw new RuntimeException(e);
       }
     }
-    return readBatch.next();
+    return readBatch.hasNext();
   }
 
   @Override
-  public void close() {
-    if (loadBatch.isLoading()) {
-      try {
-        loadBatch.readyRead();
-        if (loadBatch.size > 0) {
-          queue.put(loadBatch);
-        }
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
+  public String[] next() {
+    return readBatch.next();
+  }
+
+  public void closeWriter() {
+    try {
+      loadBatch.readyRead();
+      if (loadBatch.size > 0) {
+        queue.put(loadBatch);
       }
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
     }
     close = true;
+    // It is required if the thread waits for take.
+    if (queue.isEmpty()) {
+      if (!queue.offer(new RowBatch(0))) {
+        LOG.warn("The default last element is not added to queue");
+      }
+    }
   }
 
   private static class RowBatch extends CarbonIterator<String[]> {
@@ -88,8 +108,6 @@ public class CarbonOutputIteratorWrapper extends 
CarbonIterator<String[]> {
 
     private int size;
 
-    private boolean isLoading = true;
-
     private RowBatch(int size) {
       batch = new String[size][];
       this.size = size;
@@ -108,11 +126,6 @@ public class CarbonOutputIteratorWrapper extends 
CarbonIterator<String[]> {
     public void readyRead() {
       size = counter;
       counter = 0;
-      isLoading = false;
-    }
-
-    public boolean isLoading() {
-      return isLoading;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 75fcea3..2e62d28 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -161,6 +161,10 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
     if (sortScope != null && 
sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
       numberOfCores = 1;
     }
+    // Overriding it to the task specified cores.
+    if (model.getWritingCoresCount() > 0) {
+      numberOfCores = model.getWritingCoresCount();
+    }
 
     blockletProcessingCount = new AtomicInteger(0);
     producerExecutorService = Executors.newFixedThreadPool(numberOfCores,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 96bd2e3..d15152c 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -161,6 +161,8 @@ public class CarbonFactDataHandlerModel {
 
   private DataMapWriterListener dataMapWriterlistener;
 
+  private short writingCoresCount;
+
   /**
    * Create the model using @{@link CarbonDataLoadConfiguration}
    */
@@ -260,6 +262,7 @@ public class CarbonFactDataHandlerModel {
     DataMapWriterListener listener = new DataMapWriterListener();
     listener.registerAllWriter(configuration.getTableIdentifier(), 
configuration.getSegmentId());
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
+    carbonFactDataHandlerModel.writingCoresCount = 
configuration.getWritingCoresCount();
 
     return carbonFactDataHandlerModel;
   }
@@ -321,6 +324,7 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.tableSpec = new TableSpec(
         segmentProperties.getDimensions(),
         segmentProperties.getMeasures());
+
     return carbonFactDataHandlerModel;
   }
 
@@ -563,6 +567,10 @@ public class CarbonFactDataHandlerModel {
     return sortScope;
   }
 
+  public short getWritingCoresCount() {
+    return writingCoresCount;
+  }
+
   public DataMapWriterListener getDataMapWriterlistener() {
     return dataMapWriterlistener;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a89587e7/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 10fdd31..fef6930 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -858,17 +858,19 @@ public final class CarbonLoaderUtil {
   }
 
   /*
-   * This method will add data size and index size into tablestatus for each 
segment
+   * This method will add data size and index size into tablestatus for each 
segment. And also
+   * returns the size of the segment.
    */
-  public static void addDataIndexSizeIntoMetaEntry(LoadMetadataDetails 
loadMetadataDetails,
+  public static Long addDataIndexSizeIntoMetaEntry(LoadMetadataDetails 
loadMetadataDetails,
       String segmentId, CarbonTable carbonTable) throws IOException {
     CarbonTablePath carbonTablePath =
         
CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier()));
     Map<String, Long> dataIndexSize =
         CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, segmentId);
-    loadMetadataDetails
-        
.setDataSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE).toString());
-    loadMetadataDetails
-        
.setIndexSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE).toString());
+    Long dataSize = 
dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
+    loadMetadataDetails.setDataSize(String.valueOf(dataSize));
+    Long indexSize = 
dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE);
+    loadMetadataDetails.setIndexSize(String.valueOf(indexSize));
+    return dataSize + indexSize;
   }
 }

Reply via email to