[REBASE] Solve conflict after merging master

Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7540cc9c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7540cc9c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7540cc9c

Branch: refs/heads/carbonstore-rebase5
Commit: 7540cc9cab404c06eaf5c92da77ef7e36f9182b5
Parents: 8a9dd8b
Author: Jacky Li <jacky.li...@qq.com>
Authored: Tue Feb 27 11:26:30 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Fri Mar 2 16:00:28 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/datamap/dev/DataMap.java    |   9 +-
 .../core/datamap/dev/DataMapFactory.java        |   2 +-
 .../exception/ConcurrentOperationException.java |  16 +-
 .../core/indexstore/BlockletDetailsFetcher.java |   3 +-
 .../blockletindex/BlockletDataMap.java          |   3 +-
 .../blockletindex/SegmentIndexFileStore.java    |   2 -
 .../core/metadata/PartitionMapFileStore.java    |   0
 .../scan/executor/util/RestructureUtil.java     |   6 +-
 .../statusmanager/SegmentStatusManager.java     |  10 +-
 .../SegmentUpdateStatusManager.java             |   7 +-
 .../CarbonStreamSparkStreamingExample.scala     |  14 +-
 ...CarbonStructuredStreamingWithRowParser.scala |   8 +-
 .../hadoop/api/CarbonTableInputFormat.java      |   5 +-
 .../preaggregate/TestPreAggCreateCommand.scala  |   2 +-
 .../TestInsertAndOtherCommandConcurrent.scala   |   2 +-
 .../StandardPartitionGlobalSortTestCase.scala   |   2 +-
 .../exception/ProcessMetaDataException.java     |   2 +
 .../org/apache/carbondata/api/CarbonStore.scala |   6 +-
 .../carbondata/spark/load/CsvRDDHelper.scala    | 157 +++++++++++++++++++
 .../load/DataLoadProcessBuilderOnSpark.scala    |   3 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |   2 +-
 .../carbondata/spark/util/CommonUtil.scala      |   2 -
 .../command/carbonTableSchemaCommon.scala       |   6 +-
 .../CarbonAlterTableCompactionCommand.scala     |   3 +-
 .../management/CarbonCleanFilesCommand.scala    |   2 +-
 .../CarbonDeleteLoadByIdCommand.scala           |   2 +-
 .../CarbonDeleteLoadByLoadDateCommand.scala     |   2 +-
 .../management/CarbonLoadDataCommand.scala      |  28 ++--
 .../CarbonProjectForDeleteCommand.scala         |   2 +-
 .../CarbonProjectForUpdateCommand.scala         |   2 +-
 .../schema/CarbonAlterTableRenameCommand.scala  |   2 +-
 .../command/table/CarbonDropTableCommand.scala  |   2 +-
 .../datasources/CarbonFileFormat.scala          |   3 -
 .../TestStreamingTableWithRowParser.scala       |   9 +-
 .../vectorreader/AddColumnTestCases.scala       |   1 +
 .../datamap/DataMapWriterListener.java          |   3 +-
 .../loading/model/CarbonLoadModelBuilder.java   |  34 +++-
 .../processing/loading/model/LoadOption.java    |  15 +-
 .../processing/merger/CarbonDataMergerUtil.java |  19 ++-
 .../merger/CompactionResultSortProcessor.java   |   4 +-
 .../merger/RowResultMergerProcessor.java        |   4 +-
 .../partition/spliter/RowResultProcessor.java   |   4 +-
 .../util/CarbonDataProcessorUtil.java           |   3 +-
 store/sdk/pom.xml                               |   2 +-
 .../carbondata/sdk/file/CSVCarbonWriter.java    |   8 +-
 45 files changed, 305 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 4a68286..fdeacff 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -38,18 +38,13 @@ public interface DataMap<T extends Blocklet> {
   /**
    * Prune the datamap with filter expression and partition information. It 
returns the list of
    * blocklets where these filters can exist.
-   *
-   * @param filterExp
-   * @return
    */
-  List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties 
segmentProperties, List<PartitionSpec> partitions);
+  List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties 
segmentProperties,
+      List<PartitionSpec> partitions);
 
   // TODO Move this method to Abstract class
   /**
    * Validate whether the current segment needs to be fetching the required 
data
-   *
-   * @param filterExp
-   * @return
    */
   boolean isScanRequired(FilterResolverIntf filterExp);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 50ac279..d8a467f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -21,8 +21,8 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.events.Event;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
 
b/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
index 7e717ba..918268c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java
@@ -17,21 +17,10 @@
 
 package org.apache.carbondata.core.exception;
 
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
-/**
- * This exception will be thrown when executing concurrent operations which
- * is not supported in carbon.
- *
- * For example, when INSERT OVERWRITE is executing, other operations are not
- * allowed, so this exception will be thrown
- */
-@InterfaceAudience.User
-@InterfaceStability.Stable
-public class ConcurrentOperationException extends Exception {
+public class ConcurrentOperationException extends 
MalformedCarbonCommandException {
 
   public ConcurrentOperationException(String dbName, String tableName, String 
command1,
       String command2) {
@@ -48,3 +37,4 @@ public class ConcurrentOperationException extends Exception {
   }
 
 }
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index dd592c0..58c11db 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -53,5 +53,6 @@ public interface BlockletDetailsFetcher {
    * @param segment
    * @return
    */
-  List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> 
partitions) throws IOException;
+  List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> 
partitions)
+      throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index ce6193b..b379ae3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -660,7 +660,8 @@ public class BlockletDataMap extends 
AbstractCoarseGrainDataMap implements Cache
   }
 
   @Override
-  public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties 
segmentProperties, List<PartitionSpec> partitions) {
+  public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties 
segmentProperties,
+      List<PartitionSpec> partitions) {
     if (unsafeMemoryDMStore.getRowCount() == 0) {
       return new ArrayList<>();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 537e124..00d03a5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -21,10 +21,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index 2712cbc..e67d822 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -80,7 +80,7 @@ public class RestructureUtil {
       if (queryDimension.getDimension().hasEncoding(Encoding.IMPLICIT)) {
         presentDimension.add(queryDimension);
         isDimensionExists[dimIndex] = true;
-        dimensionInfo.dataType[queryDimension.getQueryOrder()] =
+        dimensionInfo.dataType[queryDimension.getOrdinal()] =
             queryDimension.getDimension().getDataType();
       } else {
         for (CarbonDimension tableDimension : tableBlockDimensions) {
@@ -95,7 +95,7 @@ public class RestructureUtil {
             currentBlockDimension.setOrdinal(queryDimension.getOrdinal());
             presentDimension.add(currentBlockDimension);
             isDimensionExists[dimIndex] = true;
-            dimensionInfo.dataType[currentBlockDimension.getQueryOrder()] =
+            dimensionInfo.dataType[currentBlockDimension.getOrdinal()] =
                 currentBlockDimension.getDimension().getDataType();
             break;
           }
@@ -113,7 +113,7 @@ public class RestructureUtil {
             currentBlockDimension.setOrdinal(queryDimension.getOrdinal());
             presentDimension.add(currentBlockDimension);
             isDimensionExists[dimIndex] = true;
-            dimensionInfo.dataType[currentBlockDimension.getQueryOrder()] =
+            dimensionInfo.dataType[currentBlockDimension.getOrdinal()] =
                 currentBlockDimension.getDimension().getDataType();
             break;
           }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 89666ab..1bb4b03 100755
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.locks.CarbonLockFactory;
 import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
@@ -838,6 +839,13 @@ public class SegmentStatusManager {
   public static void deleteLoadsAndUpdateMetadata(
       CarbonTable carbonTable,
       boolean isForceDeletion) throws IOException {
+    deleteLoadsAndUpdateMetadata(carbonTable, isForceDeletion, null);
+  }
+
+  public static void deleteLoadsAndUpdateMetadata(
+      CarbonTable carbonTable,
+      boolean isForceDeletion,
+      List<PartitionSpec> partitionSpecs) throws IOException {
     if (isLoadDeletionRequired(carbonTable.getMetadataPath())) {
       LoadMetadataDetails[] details =
           SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
@@ -882,7 +890,7 @@ public class SegmentStatusManager {
           CarbonLockUtil.fileUnlock(carbonTableStatusLock, 
LockUsage.TABLE_STATUS_LOCK);
           if (updationCompletionStatus) {
             DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion(
-                identifier, carbonTable.getMetadataPath(), isForceDeletion);
+                identifier, carbonTable.getMetadataPath(), isForceDeletion, 
partitionSpecs);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 6ec6fa2..4a2149e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -77,8 +77,8 @@ public class SegmentUpdateStatusManager {
     this.identifier = identifier;
     // current it is used only for read function scenarios, as file update 
always requires to work
     // on latest file status.
-    segmentDetails =
-        
SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(identifier.getTablePath()));
+    segmentDetails = SegmentStatusManager.readLoadMetadata(
+        CarbonTablePath.getMetadataPath(identifier.getTablePath()));
     if (segmentDetails.length > 0) {
       isPartitionTable = segmentDetails[0].getSegmentFile() != null;
     }
@@ -259,7 +259,8 @@ public class SegmentUpdateStatusManager {
             + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, 
TupleIdEnum.PART_ID)
             .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + 
completeBlockName;
       } else {
-        String carbonDataDirectoryPath = 
CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment);
+        String carbonDataDirectoryPath =
+            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment);
         blockPath =
             carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + 
completeBlockName;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
index f59a610..d3784e7 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
@@ -20,18 +20,14 @@ package org.apache.carbondata.examples
 import java.io.{File, PrintWriter}
 import java.net.ServerSocket
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.CarbonEnv
 import org.apache.spark.sql.CarbonSparkStreamingFactory
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.CarbonSparkStreamingListener
 import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
@@ -77,7 +73,7 @@ object CarbonStreamSparkStreamingExample {
            | 'dictionary_include'='city')
            | """.stripMargin)
       val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
streamTableName)(spark)
-      val tablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+      val tablePath = carbonTable.getTablePath
       // batch load
       val path = 
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
       spark.sql(
@@ -91,7 +87,7 @@ object CarbonStreamSparkStreamingExample {
       val serverSocket = new ServerSocket(7071)
       val thread1 = writeSocket(serverSocket)
       val thread2 = showTableCount(spark, streamTableName)
-      val ssc = startStreaming(spark, streamTableName, tablePath, 
checkpointPath)
+      val ssc = startStreaming(spark, streamTableName, checkpointPath)
       // add a Spark Streaming Listener to remove all lock for stream tables 
when stop app
       ssc.sparkContext.addSparkListener(new CarbonSparkStreamingListener())
       // wait for stop signal to stop Spark Streaming App
@@ -155,8 +151,8 @@ object CarbonStreamSparkStreamingExample {
     thread
   }
 
-  def startStreaming(spark: SparkSession, tableName: String,
-      tablePath: CarbonTablePath, checkpointPath: String): StreamingContext = {
+  def startStreaming(
+      spark: SparkSession, tableName: String, checkpointPath: String): 
StreamingContext = {
     var ssc: StreamingContext = null
     try {
       // recommend: the batch interval must set larger, such as 30s, 1min.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
index cce833b..61c45d3 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
@@ -23,7 +23,7 @@ import java.net.ServerSocket
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
 
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
 case class FileElement(school: Array[String], age: Int)
@@ -77,7 +77,7 @@ object CarbonStructuredStreamingWithRowParser {
       }
 
       val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
streamTableName)(spark)
-      val tablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+      val tablePath = carbonTable.getTablePath
       // batch load
       val path = 
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
       spark.sql(
@@ -140,7 +140,7 @@ object CarbonStructuredStreamingWithRowParser {
     thread
   }
 
-  def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread 
= {
+  def startStreaming(spark: SparkSession, tablePath: String): Thread = {
     val thread = new Thread() {
       override def run(): Unit = {
         var qry: StreamingQuery = null
@@ -167,7 +167,7 @@ object CarbonStructuredStreamingWithRowParser {
           qry = readSocketDF.writeStream
             .format("carbondata")
             .trigger(ProcessingTime("5 seconds"))
-            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("checkpointLocation", 
CarbonTablePath.getStreamingCheckpointDir(tablePath))
             .option("dbName", "default")
             .option("tableName", "stream_table_with_row_parser")
             .option(CarbonStreamParser.CARBON_STREAM_PARSER,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index b485b69..5184e07 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -33,8 +33,8 @@ import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
-import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.DataMapType;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -494,7 +494,8 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
       long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
       long maxSize = getMaxSplitSize(job);
       for (Segment segment : streamSegments) {
-        String segmentDir = 
CarbonTablePath.getSegmentPath(identifier.getTablePath(), 
segment.getSegmentNo());
+        String segmentDir = CarbonTablePath.getSegmentPath(
+            identifier.getTablePath(), segment.getSegmentNo());
         FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
         if (FileFactory.isFileExist(segmentDir, fileType)) {
           String indexName = CarbonTablePath.getCarbonStreamIndexFileName();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 8f63af6..e09b922 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import 
org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, 
MalformedDataMapCommandException}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import 
org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index b39c44c..3f0ca42 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -34,11 +34,11 @@ import 
org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainData
 import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, 
DataMapStoreManager}
 import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory
 
 // This testsuite test insert and insert overwrite with other commands 
concurrently

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index 7d0959c..629417f 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -24,9 +24,9 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.constants.LoggerAction
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class StandardPartitionGlobalSortTestCase extends QueryTest with 
BeforeAndAfterAll {
   var executorService: ExecutorService = _

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
index 3e06bde..471b645 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.exception;
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+
 // This exception will be thrown when processMetaData failed in
 // Carbon's RunnableCommand
 public class ProcessMetaDataException extends MalformedCarbonCommandException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index b69ec37..bfb1616 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -36,7 +36,6 @@ import 
org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFile
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.spark.util.DataLoadingUtil
 
 object CarbonStore {
   private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -139,9 +138,8 @@ object CarbonStore {
         carbonCleanFilesLock =
           CarbonLockUtil
             .getLockObject(absoluteTableIdentifier, 
LockUsage.CLEAN_FILES_LOCK, errorMsg)
-        SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
-        DataLoadingUtil.deleteLoadsAndUpdateMetadata(
-          isForceDeletion = true, carbonTable, 
currentTablePartitions.map(_.asJava).orNull)
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
         CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
         currentTablePartitions match {
           case Some(partitions) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
new file mode 100644
index 0000000..36d8c51
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.load
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, 
TaskAttemptContextImpl}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, 
PartitionedFile}
+import org.apache.spark.sql.util.SparkSQLUtil.sessionState
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.spark.rdd.SerializableConfiguration
+import org.apache.carbondata.spark.util.CommonUtil
+
+object CsvRDDHelper {
+
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * createsw a RDD that does reading of multiple CSV files
+   */
+  def csvFileScanRDD(
+      spark: SparkSession,
+      model: CarbonLoadModel,
+      hadoopConf: Configuration
+  ): RDD[InternalRow] = {
+    // 1. partition
+    val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
+    val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
+    val defaultParallelism = spark.sparkContext.defaultParallelism
+    CommonUtil.configureCSVInputFormat(hadoopConf, model)
+    hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
+    val jobConf = new JobConf(hadoopConf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val jobContext = new JobContextImpl(jobConf, null)
+    val inputFormat = new CSVInputFormat()
+    val rawSplits = inputFormat.getSplits(jobContext).toArray
+    val splitFiles = rawSplits.map { split =>
+      val fileSplit = split.asInstanceOf[FileSplit]
+      PartitionedFile(
+        InternalRow.empty,
+        fileSplit.getPath.toString,
+        fileSplit.getStart,
+        fileSplit.getLength,
+        fileSplit.getLocations)
+    }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+    val totalBytes = splitFiles.map(_.length + openCostInBytes).sum
+    val bytesPerCore = totalBytes / defaultParallelism
+
+    val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+    LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes 
bytes, " +
+                s"open cost is considered as scanning $openCostInBytes bytes.")
+
+    val partitions = new ArrayBuffer[FilePartition]
+    val currentFiles = new ArrayBuffer[PartitionedFile]
+    var currentSize = 0L
+
+    def closePartition(): Unit = {
+      if (currentFiles.nonEmpty) {
+        val newPartition =
+          FilePartition(
+            partitions.size,
+            currentFiles.toArray.toSeq)
+        partitions += newPartition
+      }
+      currentFiles.clear()
+      currentSize = 0
+    }
+
+    splitFiles.foreach { file =>
+      if (currentSize + file.length > maxSplitBytes) {
+        closePartition()
+      }
+      // Add the given file to the current partition.
+      currentSize += file.length + openCostInBytes
+      currentFiles += file
+    }
+    closePartition()
+
+    // 2. read function
+    val serializableConfiguration = new SerializableConfiguration(jobConf)
+    val readFunction = new (PartitionedFile => Iterator[InternalRow]) with 
Serializable {
+      override def apply(file: PartitionedFile): Iterator[InternalRow] = {
+        new Iterator[InternalRow] {
+          val hadoopConf = serializableConfiguration.value
+          val jobTrackerId: String = {
+            val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+            formatter.format(new Date())
+          }
+          val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 
0)
+          val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, 
attemptId)
+          val inputSplit =
+            new FileSplit(new Path(file.filePath), file.start, file.length, 
file.locations)
+          var finished = false
+          val inputFormat = new CSVInputFormat()
+          val reader = inputFormat.createRecordReader(inputSplit, 
hadoopAttemptContext)
+          reader.initialize(inputSplit, hadoopAttemptContext)
+
+          override def hasNext: Boolean = {
+            if (!finished) {
+              if (reader != null) {
+                if (reader.nextKeyValue()) {
+                  true
+                } else {
+                  finished = true
+                  reader.close()
+                  false
+                }
+              } else {
+                finished = true
+                false
+              }
+            } else {
+              false
+            }
+          }
+
+          override def next(): InternalRow = {
+            new 
GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]])
+          }
+        }
+      }
+    }
+    new FileScanRDD(spark, readFunction, partitions)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index e1bd84b..1062cd7 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -34,7 +34,6 @@ import 
org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, Failure
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, 
NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.util.DataLoadingUtil
 
 /**
  * Use sortBy operator in spark to load the data
@@ -52,7 +51,7 @@ object DataLoadProcessBuilderOnSpark {
     } else {
       // input data from files
       val columnCount = model.getCsvHeaderColumns.length
-      DataLoadingUtil.csvFileScanRDD(sparkSession, model, hadoopConf)
+      CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf)
         .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 33263d6..298c84e 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.command.{DataTypeInfo, UpdateTableModel}
 import org.apache.spark.sql.types._
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogService
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, 
DictionaryColumnUniqueIdentifier}
@@ -45,7 +46,6 @@ import 
org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 object CarbonScalaUtil {
   def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 9104a32..d3093fb 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -816,8 +816,6 @@ object CommonUtil {
                   val carbonTable = CarbonMetadata.getInstance
                     
.getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName)
                   
SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
-                  DataLoadingUtil.deleteLoadsAndUpdateMetadata(
-                    isForceDeletion = true, carbonTable, null)
                 } catch {
                   case _: Exception =>
                     LOGGER.warn(s"Error while cleaning table " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 71ce2c6..3c21af3 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -31,19 +31,15 @@ import 
org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
-import org.apache.carbondata.core.metadata.SegmentFileStore.SegmentFile
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, 
DecimalType}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema._
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
RelationIdentifier, TableInfo, TableSchema}
 import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, 
ParentColumnTableRelation}
-import org.apache.carbondata.core.service.CarbonCommonFactory
 import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.DataTypeUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.CompactionType

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index e47c500..2f4aa30 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -21,7 +21,7 @@ import java.io.{File, IOException}
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonEnv, Row, SQLContext, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{AlterTableModel, 
AtomicRunnableCommand, CarbonMergerMapping, CompactionModel}
@@ -47,7 +47,6 @@ import 
org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEv
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, 
CompactionType}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.CommonUtil
 import org.apache.carbondata.streaming.StreamHandoffRDD
 import org.apache.carbondata.streaming.segment.StreamSegment
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index d2adc57..2092028 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -26,10 +26,10 @@ import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, 
OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 0861c63..81427a1 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, 
DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 case class CarbonDeleteLoadByIdCommand(
     loadIds: Seq[String],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index dcbc6ce..1d76bda 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent, 
DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 case class CarbonDeleteLoadByLoadDateCommand(
     databaseNameOp: Option[String],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 4806a6f..34a464a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -59,10 +59,9 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
-import org.apache.carbondata.core.statusmanager.SegmentStatus
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
DataTypeUtil}
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
@@ -71,18 +70,15 @@ import 
org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, 
LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
-import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, 
CarbonLoadModelBuilder, LoadOption}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, 
LoadOption}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
CarbonLoaderUtil}
 import 
org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, 
CarbonDropPartitionRDD}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, 
GlobalDictionaryUtil}
-import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark
+import org.apache.carbondata.spark.load.{CsvRDDHelper, 
DataLoadProcessorStepOnSpark}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, 
GlobalDictionaryUtil, SparkDataTypeConverterImpl}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, 
GlobalDictionaryUtil, SparkDataTypeConverterImpl}
 
 case class CarbonLoadDataCommand(
     databaseNameOp: Option[String],
@@ -193,12 +189,18 @@ case class CarbonLoadDataCommand(
       carbonLoadModel.setAggLoadRequest(
         internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, 
"false").toBoolean)
       
carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
+
+      val javaPartition = mutable.Map[String, String]()
+      partition.foreach { case (k, v) =>
+        if (v.isEmpty) javaPartition(k) = null else javaPartition(k) = v.get
+      }
+
       new CarbonLoadModelBuilder(table).build(
         options.asJava,
         optionsFinal,
         carbonLoadModel,
         hadoopConf,
-        partition,
+        javaPartition.asJava,
         dataFrame.isDefined)
       // Delete stale segment folders that are not in table status but are 
physically present in
       // the Fact folder
@@ -231,11 +233,7 @@ case class CarbonLoadDataCommand(
         // First system has to partition the data first and then call the load 
data
         LOGGER.info(s"Initiating Direct Load for the Table : 
($dbName.$tableName)")
         // Clean up the old invalid segment data before creating a new entry 
for new load.
-        SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false)
-        DataLoadingUtil.deleteLoadsAndUpdateMetadata(
-          isForceDeletion = false,
-          table,
-          currPartitions)
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, 
currPartitions)
         // add the start entry for the new load in the table status file
         if (updateModel.isEmpty && !table.isHivePartitionTable) {
           CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
@@ -672,7 +670,7 @@ case class CarbonLoadDataCommand(
           }
         }
         val columnCount = carbonLoadModel.getCsvHeaderColumns.length
-        val rdd = DataLoadingUtil.csvFileScanRDD(
+        val rdd = CsvRDDHelper.csvFileScanRDD(
           sparkSession,
           model = carbonLoadModel,
           hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, 
columnCount))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index f074285..230378b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -22,8 +22,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
-import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 5165342..2a92478 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -25,10 +25,10 @@ import 
org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.ArrayType
 import org.apache.spark.storage.StorageLevel
 
-import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 5f8eb12..474f9c6 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -23,12 +23,12 @@ import 
org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCo
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.util.AlterTableUtil
 
-import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 8001a93..0298eea 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -28,12 +28,12 @@ import 
org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events._
-import org.apache.carbondata.spark.exception.{ConcurrentOperationException, 
ProcessMetaDataException}
 
 case class CarbonDropTableCommand(
     ifExistsSet: Boolean,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 61a31a5..2eed988 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -50,11 +50,8 @@ import 
org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu
 import 
org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, 
CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, 
Util}
 
 class CarbonFileFormat
   extends FileFormat

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index 3e3b2c5..064ff28 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -34,7 +34,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
 case class FileElement(school: Array[String], age: Integer)
@@ -735,7 +735,7 @@ class TestStreamingTableWithRowParser extends QueryTest 
with BeforeAndAfterAll {
   def createSocketStreamingThread(
       spark: SparkSession,
       port: Int,
-      tablePath: CarbonTablePath,
+      tablePath: String,
       tableIdentifier: TableIdentifier,
       badRecordAction: String = "force",
       intervalSecond: Int = 2,
@@ -776,7 +776,7 @@ class TestStreamingTableWithRowParser extends QueryTest 
with BeforeAndAfterAll {
           qry = readSocketDF.writeStream
             .format("carbondata")
             .trigger(ProcessingTime(s"$intervalSecond seconds"))
-            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("checkpointLocation", 
CarbonTablePath.getStreamingCheckpointDir(tablePath))
             .option("bad_records_action", badRecordAction)
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
@@ -817,7 +817,6 @@ class TestStreamingTableWithRowParser extends QueryTest 
with BeforeAndAfterAll {
     val identifier = new TableIdentifier(tableName, Option("streaming1"))
     val carbonTable = 
CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
       .asInstanceOf[CarbonRelation].metaData.carbonTable
-    val tablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     var server: ServerSocket = null
     try {
       server = getServerSocket()
@@ -830,7 +829,7 @@ class TestStreamingTableWithRowParser extends QueryTest 
with BeforeAndAfterAll {
       val thread2 = createSocketStreamingThread(
         spark = spark,
         port = server.getLocalPort,
-        tablePath = tablePath,
+        tablePath = carbonTable.getTablePath,
         tableIdentifier = identifier,
         badRecordAction = badRecordAction,
         intervalSecond = intervalOfIngest,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index 995f041..d94570a 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 1104229..66f8bc5 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -74,7 +74,8 @@ public class DataMapWriterListener {
     }
     List<String> columns = factory.getMeta().getIndexedColumns();
     List<AbstractDataMapWriter> writers = registry.get(columns);
-    AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, 
null), dataWritePath);
+    AbstractDataMapWriter writer = factory.createWriter(
+        new Segment(segmentId, null), dataWritePath);
     if (writers != null) {
       writers.add(writer);
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 17e8dbe..29dfa40 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model;
 
 import java.io.IOException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -100,6 +102,26 @@ public class CarbonLoadModelBuilder {
       Map<String, String> optionsFinal,
       CarbonLoadModel carbonLoadModel,
       Configuration hadoopConf) throws InvalidLoadOptionException, IOException 
{
+    build(options, optionsFinal, carbonLoadModel, hadoopConf, new 
HashMap<String, String>(), false);
+  }
+
+  /**
+   * build CarbonLoadModel for data loading
+   * @param options Load options from user input
+   * @param optionsFinal Load options that populated with default values for 
optional options
+   * @param carbonLoadModel The output load model
+   * @param hadoopConf hadoopConf is needed to read CSV header if there 
'fileheader' is not set in
+   *                   user provided load options
+   * @param partitions partition name map to path
+   * @param isDataFrame true if build for load for dataframe
+   */
+  public void build(
+      Map<String, String> options,
+      Map<String, String> optionsFinal,
+      CarbonLoadModel carbonLoadModel,
+      Configuration hadoopConf,
+      Map<String, String> partitions,
+      boolean isDataFrame) throws InvalidLoadOptionException, IOException {
     carbonLoadModel.setTableName(table.getTableName());
     carbonLoadModel.setDatabaseName(table.getDatabaseName());
     carbonLoadModel.setTablePath(table.getTablePath());
@@ -214,8 +236,18 @@ public class CarbonLoadModelBuilder {
     carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter));
     carbonLoadModel.setCsvHeader(fileHeader);
     carbonLoadModel.setColDictFilePath(column_dict);
+
+    List<String> ignoreColumns = new ArrayList<>();
+    if (!isDataFrame) {
+      for (Map.Entry<String, String> partition : partitions.entrySet()) {
+        if (partition.getValue() != null) {
+          ignoreColumns.add(partition.getKey());
+        }
+      }
+    }
+
     carbonLoadModel.setCsvHeaderColumns(
-        LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf));
+        LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf, 
ignoreColumns));
 
     int validatedMaxColumns = validateMaxColumns(
         carbonLoadModel.getCsvHeaderColumns(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index 5af4859..bac1a94 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.Maps;
@@ -201,6 +203,16 @@ public class LoadOption {
   public static String[] getCsvHeaderColumns(
       CarbonLoadModel carbonLoadModel,
       Configuration hadoopConf) throws IOException {
+    return getCsvHeaderColumns(carbonLoadModel, hadoopConf, new 
LinkedList<String>());
+  }
+
+  /**
+   * Return CSV header field names, with partition column
+   */
+  public static String[] getCsvHeaderColumns(
+      CarbonLoadModel carbonLoadModel,
+      Configuration hadoopConf,
+      List<String> staticPartitionCols) throws IOException {
     String delimiter;
     if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter())) {
       delimiter = CarbonCommonConstants.COMMA;
@@ -231,7 +243,7 @@ public class LoadOption {
     }
 
     if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), 
csvColumns,
-        carbonLoadModel.getCarbonDataLoadSchema())) {
+        carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) {
       if (csvFile == null) {
         LOG.error("CSV header in DDL is not proper."
             + " Column names in schema and CSV header are not the same.");
@@ -249,4 +261,5 @@ public class LoadOption {
     }
     return csvColumns;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index d2faef5..142b2cb 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -21,7 +21,16 @@ import java.io.File;
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -32,7 +41,6 @@ import 
org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
@@ -592,10 +600,6 @@ public final class CarbonDataMergerUtil {
     List<LoadMetadataDetails> segmentsToBeMerged =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
-    CarbonTable carbonTable = 
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
-    CarbonTableIdentifier tableIdentifier = 
carbonTable.getCarbonTableIdentifier();
-
-
     // total length
     long totalLength = 0;
 
@@ -1013,7 +1017,8 @@ public final class CarbonDataMergerUtil {
     CarbonFile[] updateDeltaFiles = null;
     Set<String> uniqueBlocks = new HashSet<String>();
 
-    String segmentPath = 
CarbonTablePath.getSegmentPath(absoluteTableIdentifier.getTablePath(), 
seg.getSegmentNo());
+    String segmentPath = CarbonTablePath.getSegmentPath(
+        absoluteTableIdentifier.getTablePath(), seg.getSegmentNo());
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, 
FileFactory.getFileType(segmentPath));
     CarbonFile[] allSegmentFiles = segDir.listFiles();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index ea11e22..ebcf944 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -405,8 +405,8 @@ public class CompactionResultSortProcessor extends 
AbstractResultProcessor {
           partitionSpec.getLocation().toString() + 
CarbonCommonConstants.FILE_SEPARATOR
               + carbonLoadModel.getFactTimeStamp() + ".tmp";
     } else {
-      carbonStoreLocation = CarbonDataProcessorUtil
-          .createCarbonStoreLocation(carbonLoadModel.getDatabaseName(), 
tableName, carbonLoadModel.getSegmentId());
+      carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(
+          carbonLoadModel.getDatabaseName(), tableName, 
carbonLoadModel.getSegmentId());
     }
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = 
CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, 
segmentProperties, tableName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 278d5bb..2616def 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -76,8 +76,8 @@ public class RowResultMergerProcessor extends 
AbstractResultProcessor {
           partitionSpec.getLocation().toString() + 
CarbonCommonConstants.FILE_SEPARATOR + loadModel
               .getFactTimeStamp() + ".tmp";
     } else {
-      carbonStoreLocation = CarbonDataProcessorUtil
-          .createCarbonStoreLocation(loadModel.getDatabaseName(), tableName, 
loadModel.getSegmentId());
+      carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(
+          loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
     }
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = 
CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, 
tableName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index df2e2a2..221697f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -47,8 +47,8 @@ public class RowResultProcessor {
     CarbonDataProcessorUtil.createLocations(tempStoreLocation);
     this.segmentProperties = segProp;
     String tableName = carbonTable.getTableName();
-    String carbonStoreLocation = CarbonDataProcessorUtil
-        .createCarbonStoreLocation(loadModel.getDatabaseName(), tableName, 
loadModel.getSegmentId());
+    String carbonStoreLocation = 
CarbonDataProcessorUtil.createCarbonStoreLocation(
+        loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
     CarbonFactDataHandlerModel carbonFactDataHandlerModel =
         CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, 
carbonTable,
             segProp, tableName, tempStoreLocation, carbonStoreLocation);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index dc8ffd7..19ad47d 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -370,7 +370,8 @@ public final class CarbonDataProcessorUtil {
    *
    * @return data directory path
    */
-  public static String createCarbonStoreLocation(String databaseName, String 
tableName, String segmentId) {
+  public static String createCarbonStoreLocation(String databaseName, String 
tableName,
+      String segmentId) {
     CarbonTable carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
     return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), 
segmentId);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/store/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index 54fba55..b3dd464 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -7,7 +7,7 @@
   <parent>
     <groupId>org.apache.carbondata</groupId>
     <artifactId>carbondata-parent</artifactId>
-    <version>1.3.0-SNAPSHOT</version>
+    <version>1.4.0-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7540cc9c/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
index dc5696a..df6afc6 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
@@ -23,7 +23,7 @@ import java.util.UUID;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 
 import org.apache.hadoop.conf.Configuration;
@@ -42,9 +42,9 @@ import 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 @InterfaceAudience.Internal
 class CSVCarbonWriter extends CarbonWriter {
 
-  private RecordWriter<NullWritable, StringArrayWritable> recordWriter;
+  private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
   private TaskAttemptContext context;
-  private StringArrayWritable writable;
+  private ObjectArrayWritable writable;
 
   CSVCarbonWriter(CarbonLoadModel loadModel) throws IOException {
     Configuration hadoopConf = new Configuration();
@@ -57,7 +57,7 @@ class CSVCarbonWriter extends CarbonWriter {
     TaskAttemptContextImpl context = new TaskAttemptContextImpl(hadoopConf, 
attemptID);
     this.recordWriter = format.getRecordWriter(context);
     this.context = context;
-    this.writable = new StringArrayWritable();
+    this.writable = new ObjectArrayWritable();
   }
 
   /**

Reply via email to