This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1084136  [CARBONDATA-3517] Add Segment with Path
1084136 is described below

commit 108413610f96bfb16c2036eb2a35a71080c42594
Author: ravipesala <[email protected]>
AuthorDate: Tue Sep 10 15:42:09 2019 +0530

    [CARBONDATA-3517] Add Segment with Path
    
    This closes #3381
---
 .../apache/carbondata/core/datamap/Segment.java    |  42 ++++
 .../block/SegmentPropertiesAndSchemaHolder.java    |  37 ++-
 .../indexstore/blockletindex/BlockDataMap.java     |   7 +-
 .../carbondata/core/metadata/SegmentFileStore.java | 116 ++++++++-
 .../carbondata/core/statusmanager/FileFormat.java  |  42 +++-
 .../core/statusmanager/LoadMetadataDetails.java    |  19 +-
 .../statusmanager/SegmentUpdateStatusManager.java  |  39 +--
 .../apache/carbondata/core/util/CarbonUtil.java    |  24 +-
 .../carbondata/mv/rewrite/MVCreateTestCase.scala   | 116 ++++++++-
 .../testsuite/addsegment/AddSegmentTestCase.scala  | 280 +++++++++++++++++++++
 .../org/apache/carbondata/api/CarbonStore.scala    |   2 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala       |  11 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    |   1 +
 .../spark/rdd/CarbonDataRDDFactory.scala           |  10 +-
 .../command/management/CarbonAddLoadCommand.scala  | 205 +++++++++++++++
 .../command/mutation/DeleteExecution.scala         |  17 +-
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |  12 +-
 .../loading/CarbonDataLoadConfiguration.java       |  10 +
 .../processing/loading/DataLoadProcessBuilder.java |  11 +
 .../store/CarbonFactDataHandlerModel.java          |  10 +-
 .../streaming/segment/StreamSegment.java           |   2 +-
 21 files changed, 931 insertions(+), 82 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index ad80182..a235f0d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -69,6 +69,21 @@ public class Segment implements Serializable, Writable {
 
   private long indexSize = 0;
 
+  /**
+   * Whether to cache the segment data maps in executors or not.
+   */
+  private boolean isCacheable = true;
+
+  /**
+   * Path of segment where it exists
+   */
+  private transient String segmentPath;
+
+  /**
+   * Properties of the segment.
+   */
+  private transient Map<String, String> options;
+
   public Segment() {
 
   }
@@ -101,6 +116,13 @@ public class Segment implements Serializable, Writable {
     }
   }
 
+  public Segment(String segmentNo, String segmentFileName, String segmentPath,
+      Map<String, String> options) {
+    this(segmentNo, segmentFileName);
+    this.segmentPath = segmentPath;
+    this.options = options;
+  }
+
   /**
    *
    * @param segmentNo
@@ -282,6 +304,26 @@ public class Segment implements Serializable, Writable {
     this.indexSize = indexSize;
   }
 
+  public boolean isCacheable() {
+    return isCacheable;
+  }
+
+  public void setCacheable(boolean cacheable) {
+    isCacheable = cacheable;
+  }
+
+  public String getSegmentPath() {
+    return segmentPath;
+  }
+
+  public Map<String, String> getOptions() {
+    return options;
+  }
+
+  public void setOptions(Map<String, String> options) {
+    this.options = options;
+  }
+
   @Override public void write(DataOutput out) throws IOException {
     out.writeUTF(segmentNo);
     boolean writeSegmentFileName = segmentFileName != null;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
index 056a0e7..00b624f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
@@ -295,6 +295,8 @@ public class SegmentPropertiesAndSchemaHolder {
     // of maintaining 2 variables
     private CarbonRowSchema[] taskSummarySchemaForBlock;
     private CarbonRowSchema[] taskSummarySchemaForBlocklet;
+    private CarbonRowSchema[] taskSummarySchemaForBlockWithOutFilePath;
+    private CarbonRowSchema[] taskSummarySchemaForBlockletWithOutFilePath;
     private CarbonRowSchema[] fileFooterEntrySchemaForBlock;
     private CarbonRowSchema[] fileFooterEntrySchemaForBlocklet;
 
@@ -325,6 +327,8 @@ public class SegmentPropertiesAndSchemaHolder {
 
       taskSummarySchemaForBlock = null;
       taskSummarySchemaForBlocklet = null;
+      taskSummarySchemaForBlockWithOutFilePath = null;
+      taskSummarySchemaForBlockletWithOutFilePath = null;
       fileFooterEntrySchemaForBlock = null;
       fileFooterEntrySchemaForBlocklet = null;
     }
@@ -393,7 +397,7 @@ public class SegmentPropertiesAndSchemaHolder {
 
     public CarbonRowSchema[] getTaskSummarySchemaForBlock(boolean 
storeBlockletCount,
         boolean filePathToBeStored) throws MemoryException {
-      if (null == taskSummarySchemaForBlock) {
+      if (null == taskSummarySchemaForBlock && filePathToBeStored) {
         synchronized (taskSchemaLock) {
           if (null == taskSummarySchemaForBlock) {
             taskSummarySchemaForBlock = SchemaGenerator
@@ -401,13 +405,26 @@ public class SegmentPropertiesAndSchemaHolder {
                     storeBlockletCount, filePathToBeStored);
           }
         }
+      } else if (null == taskSummarySchemaForBlockWithOutFilePath && 
!filePathToBeStored) {
+        synchronized (taskSchemaLock) {
+          if (null == taskSummarySchemaForBlockWithOutFilePath) {
+            taskSummarySchemaForBlockWithOutFilePath = SchemaGenerator
+                .createTaskSummarySchema(segmentProperties, 
getMinMaxCacheColumns(),
+                    storeBlockletCount, filePathToBeStored);
+          }
+        }
+      }
+      if (filePathToBeStored) {
+        return taskSummarySchemaForBlock;
+      } else {
+        return taskSummarySchemaForBlockWithOutFilePath;
       }
-      return taskSummarySchemaForBlock;
+
     }
 
     public CarbonRowSchema[] getTaskSummarySchemaForBlocklet(boolean 
storeBlockletCount,
         boolean filePathToBeStored) throws MemoryException {
-      if (null == taskSummarySchemaForBlocklet) {
+      if (null == taskSummarySchemaForBlocklet && filePathToBeStored) {
         synchronized (taskSchemaLock) {
           if (null == taskSummarySchemaForBlocklet) {
             taskSummarySchemaForBlocklet = SchemaGenerator
@@ -415,8 +432,20 @@ public class SegmentPropertiesAndSchemaHolder {
                     storeBlockletCount, filePathToBeStored);
           }
         }
+      } else if (null == taskSummarySchemaForBlockletWithOutFilePath && 
!filePathToBeStored) {
+        synchronized (taskSchemaLock) {
+          if (null == taskSummarySchemaForBlockletWithOutFilePath) {
+            taskSummarySchemaForBlockletWithOutFilePath = SchemaGenerator
+                .createTaskSummarySchema(segmentProperties, 
getMinMaxCacheColumns(),
+                    storeBlockletCount, filePathToBeStored);
+          }
+        }
+      }
+      if (filePathToBeStored) {
+        return taskSummarySchemaForBlocklet;
+      } else {
+        return taskSummarySchemaForBlockletWithOutFilePath;
       }
-      return taskSummarySchemaForBlocklet;
     }
 
     public CarbonRowSchema[] getBlockFileFooterEntrySchema() {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index f168761..0169c12 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -133,8 +133,11 @@ public class BlockDataMap extends CoarseGrainDataMap
     // structure
     byte[] filePath = null;
     boolean isPartitionTable = 
blockletDataMapInfo.getCarbonTable().isHivePartitionTable();
-    if (isPartitionTable || 
!blockletDataMapInfo.getCarbonTable().isTransactionalTable()
-        || blockletDataMapInfo.getCarbonTable().isSupportFlatFolder()) {
+    if (isPartitionTable || 
!blockletDataMapInfo.getCarbonTable().isTransactionalTable() ||
+        blockletDataMapInfo.getCarbonTable().isSupportFlatFolder() ||
+        // if the segment data is written in tablepath then no need to store 
whole path of file.
+        !blockletDataMapInfo.getFilePath().startsWith(
+            blockletDataMapInfo.getCarbonTable().getTablePath())) {
       filePath = 
path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
       isFilePathStored = true;
     }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 1c2d50d..544de16 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -171,6 +171,66 @@ public class SegmentFileStore {
   }
 
   /**
+   * Write segment file to the metadata folder of the table
+   *
+   * @param carbonTable CarbonTable
+   * @param segmentId segment id
+   * @param UUID      a UUID string used to construct the segment file name
+   * @return segment file name
+   */
+  public static String writeSegmentFile(CarbonTable carbonTable, String 
segmentId, String UUID,
+      String segPath)
+      throws IOException {
+    return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath);
+  }
+
+
+  /**
+   * Write segment file to the metadata folder of the table.
+   *
+   * @param carbonTable CarbonTable
+   * @param segment segment
+   * @return boolean , whether write is success or fail.
+   */
+  public static boolean writeSegmentFile(CarbonTable carbonTable, Segment 
segment)
+      throws IOException {
+    String tablePath = carbonTable.getTablePath();
+    CarbonFile segmentFolder = 
FileFactory.getCarbonFile(segment.getSegmentPath());
+    CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || 
file.getName()
+            .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
+      }
+    });
+    if (indexFiles != null && indexFiles.length > 0) {
+      SegmentFile segmentFile = new SegmentFile();
+      segmentFile.setOptions(segment.getOptions());
+      FolderDetails folderDetails = new FolderDetails();
+      folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+      folderDetails.setRelative(false);
+      segmentFile.addPath(segment.getSegmentPath(), folderDetails);
+      for (CarbonFile file : indexFiles) {
+        if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+          folderDetails.setMergeFileName(file.getName());
+        } else {
+          folderDetails.getFiles().add(file.getName());
+        }
+      }
+      String segmentFileFolder = 
CarbonTablePath.getSegmentFilesLocation(tablePath);
+      CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder);
+      if (!carbonFile.exists()) {
+        carbonFile.mkdirs(segmentFileFolder);
+      }
+      // write segment info to new file.
+      writeSegmentFile(segmentFile,
+          segmentFileFolder + File.separator + segment.getSegmentFileName());
+
+      return true;
+    }
+    return false;
+  }
+
+  /**
    * Write segment file to the metadata folder of the table selecting only the 
current load files
    *
    * @param carbonTable
@@ -181,10 +241,13 @@ public class SegmentFileStore {
    * @throws IOException
    */
   public static String writeSegmentFile(CarbonTable carbonTable, String 
segmentId, String UUID,
-      final String currentLoadTimeStamp) throws IOException {
+      final String currentLoadTimeStamp, String absSegPath) throws IOException 
{
     String tablePath = carbonTable.getTablePath();
     boolean supportFlatFolder = carbonTable.isSupportFlatFolder();
-    String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
+    String segmentPath = absSegPath;
+    if (absSegPath == null) {
+      segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
+    }
     CarbonFile segmentFolder = FileFactory.getCarbonFile(segmentPath);
     CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
       @Override public boolean accept(CarbonFile file) {
@@ -200,7 +263,7 @@ public class SegmentFileStore {
     if (indexFiles != null && indexFiles.length > 0) {
       SegmentFile segmentFile = new SegmentFile();
       FolderDetails folderDetails = new FolderDetails();
-      folderDetails.setRelative(true);
+      folderDetails.setRelative(absSegPath == null);
       folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
       for (CarbonFile file : indexFiles) {
         if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
@@ -211,7 +274,11 @@ public class SegmentFileStore {
       }
       String segmentRelativePath = "/";
       if (!supportFlatFolder) {
-        segmentRelativePath = segmentPath.substring(tablePath.length(), 
segmentPath.length());
+        if (absSegPath != null) {
+          segmentRelativePath = absSegPath;
+        } else {
+          segmentRelativePath = segmentPath.substring(tablePath.length());
+        }
       }
       segmentFile.addPath(segmentRelativePath, folderDetails);
       String segmentFileFolder = 
CarbonTablePath.getSegmentFilesLocation(tablePath);
@@ -233,7 +300,6 @@ public class SegmentFileStore {
     return null;
   }
 
-
   /**
    * Move the loaded data from source folder to destination folder.
    */
@@ -326,6 +392,18 @@ public class SegmentFileStore {
    */
   public static boolean updateSegmentFile(CarbonTable carbonTable, String 
segmentId,
       String segmentFile, String tableId, SegmentFileStore segmentFileStore) 
throws IOException {
+    return updateSegmentFile(carbonTable, segmentId, segmentFile, tableId, 
segmentFileStore, null);
+  }
+
+  /**
+   * This API will update the segmentFile of a passed segment.
+   *
+   * @return boolean which determines whether status update is done or not.
+   * @throws IOException
+   */
+  public static boolean updateSegmentFile(CarbonTable carbonTable, String 
segmentId,
+      String segmentFile, String tableId, SegmentFileStore segmentFileStore,
+      SegmentStatus segmentStatus) throws IOException {
     boolean status = false;
     String tablePath = carbonTable.getTablePath();
     String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
@@ -353,8 +431,20 @@ public class SegmentFileStore {
           // if the segments is in the list of marked for delete then update 
the status.
           if (segmentId.equals(detail.getLoadName())) {
             detail.setSegmentFile(segmentFile);
-            detail.setIndexSize(String.valueOf(CarbonUtil
-                .getCarbonIndexSize(segmentFileStore, 
segmentFileStore.getLocationMap())));
+            if (segmentStatus != null) {
+              HashMap<String, Long> dataSizeAndIndexSize =
+                  CarbonUtil.getDataSizeAndIndexSize(segmentFileStore);
+              detail.setDataSize(
+                  
dataSizeAndIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE)
+                      .toString());
+              detail.setIndexSize(
+                  
dataSizeAndIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE)
+                      .toString());
+              detail.setSegmentStatus(segmentStatus);
+            } else {
+              detail.setIndexSize(String.valueOf(CarbonUtil
+                  .getCarbonIndexSize(segmentFileStore, 
segmentFileStore.getLocationMap())));
+            }
             break;
           }
         }
@@ -1051,6 +1141,11 @@ public class SegmentFileStore {
      */
     private Map<String, FolderDetails> locationMap;
 
+    /**
+     * Segment option properties
+     */
+    private Map<String, String> options;
+
     SegmentFile() {
       locationMap = new HashMap<>();
     }
@@ -1086,6 +1181,13 @@ public class SegmentFileStore {
       locationMap.put(path, details);
     }
 
+    public Map<String, String> getOptions() {
+      return options;
+    }
+
+    public void setOptions(Map<String, String> options) {
+      this.options = options;
+    }
   }
 
   /**
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
index 05f616f..4b79eb6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
@@ -17,16 +17,33 @@
 
 package org.apache.carbondata.core.statusmanager;
 
+import java.io.Serializable;
+import java.util.Objects;
+
 /**
  * The data file format supported in carbondata project
  */
-public enum FileFormat {
+public class FileFormat implements Serializable {
+
+  public static final FileFormat COLUMNAR_V3 = new FileFormat("COLUMNAR_V3", 
0);
+  public static final FileFormat ROW_V1 = new FileFormat("ROW_V1", 1);
+
+  private String format;
+
+  private int ordinal;
 
-  // carbondata columnar file format, optimized for read
-  COLUMNAR_V3,
+  public FileFormat(String format) {
+    this.format = format;
+  }
+
+  public FileFormat(String format, int ordinal) {
+    this.format = format;
+    this.ordinal = ordinal;
+  }
 
-  // carbondata row file format, optimized for write
-  ROW_V1;
+  public int ordinal() {
+    return ordinal;
+  }
 
   public static FileFormat getByOrdinal(int ordinal) {
     switch (ordinal) {
@@ -38,4 +55,19 @@ public enum FileFormat {
 
     return COLUMNAR_V3;
   }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    FileFormat that = (FileFormat) o;
+    return Objects.equals(format, that.format);
+  }
+
+  @Override public int hashCode() {
+    return Objects.hash(format);
+  }
+
+  @Override public String toString() {
+    return format;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 54a6d0f..17a48c5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -122,7 +122,12 @@ public class LoadMetadataDetails implements Serializable {
   /**
    * the file format of this segment
    */
-  private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
+  private String fileFormat = FileFormat.COLUMNAR_V3.toString();
+
+  /**
+   * Segment path if the segment is added externally.
+   */
+  private String path;
 
   /**
    * Segment file name where it has the information of partition information.
@@ -420,11 +425,11 @@ public class LoadMetadataDetails implements Serializable {
   }
 
   public FileFormat getFileFormat() {
-    return fileFormat;
+    return new FileFormat(fileFormat);
   }
 
   public void setFileFormat(FileFormat fileFormat) {
-    this.fileFormat = fileFormat;
+    this.fileFormat = fileFormat.toString();
   }
 
   public String getSegmentFile() {
@@ -447,4 +452,12 @@ public class LoadMetadataDetails implements Serializable {
   public void setExtraInfo(String extraInfo) {
     this.extraInfo = extraInfo;
   }
+
+  public String getPath() {
+    return path;
+  }
+
+  public void setPath(String path) {
+    this.path = path;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index bc794f4..601abcd 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -48,7 +48,6 @@ import 
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
-import org.apache.carbondata.core.mutate.TupleIdEnum;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -252,49 +251,23 @@ public class SegmentUpdateStatusManager {
    * @throws Exception
    */
   public String[] getDeleteDeltaFilePath(String blockFilePath, String 
segmentId) throws Exception {
-    String blockId =
-        CarbonUtil.getBlockId(identifier, blockFilePath, segmentId, true, 
isStandardTable);
-    String tupleId;
-    if (!isStandardTable) {
-      tupleId = CarbonTablePath.getShortBlockIdForPartitionTable(blockId);
-    } else {
-      tupleId = CarbonTablePath.getShortBlockId(blockId);
-    }
-    return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+    CarbonFile file = FileFactory.getCarbonFile(blockFilePath);
+    return getDeltaFiles(file, segmentId)
         .toArray(new String[0]);
   }
 
   /**
    * Returns all delta file paths of specified block
    */
-  private List<String> getDeltaFiles(String tupleId, String extension)
-      throws Exception {
-    String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, 
TupleIdEnum.SEGMENT_ID);
-    String completeBlockName = CarbonTablePath.addDataPartPrefix(
-        CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID)
-            + CarbonCommonConstants.FACT_FILE_EXT);
-
-    String blockPath;
-    if (!isStandardTable) {
-      blockPath = identifier.getTablePath() + 
CarbonCommonConstants.FILE_SEPARATOR
-          + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, 
TupleIdEnum.PART_ID)
-          .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + 
completeBlockName;
-    } else {
-      String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(
-          identifier.getTablePath(), segment);
-      blockPath =
-          carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + 
completeBlockName;
-    }
-    CarbonFile file = FileFactory.getCarbonFile(blockPath, 
FileFactory.getFileType(blockPath));
-    if (!file.exists()) {
-      throw new Exception("Invalid tuple id " + tupleId);
-    }
+  private List<String> getDeltaFiles(CarbonFile file, String segmentId) throws 
Exception {
+    String completeBlockName = file.getName();
     String blockNameWithoutExtn =
         completeBlockName.substring(0, completeBlockName.lastIndexOf('.'));
     //blockName without timestamp
     final String blockNameFromTuple =
         blockNameWithoutExtn.substring(0, 
blockNameWithoutExtn.lastIndexOf("-"));
-    return getDeltaFiles(file, blockNameFromTuple, extension, segment);
+    return getDeltaFiles(file, blockNameFromTuple, 
CarbonCommonConstants.DELETE_DELTA_FILE_EXT,
+        segmentId);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index cb23e3e..19d142e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2595,7 +2595,7 @@ public final class CarbonUtil {
   }
 
   // Get the total size of carbon data and the total size of carbon index
-  private static HashMap<String, Long> 
getDataSizeAndIndexSize(SegmentFileStore fileStore)
+  public static HashMap<String, Long> getDataSizeAndIndexSize(SegmentFileStore 
fileStore)
       throws IOException {
     long carbonDataSize = 0L;
     long carbonIndexSize = 0L;
@@ -2633,15 +2633,25 @@ public final class CarbonUtil {
       Set<String> carbonindexFiles = folderDetails.getFiles();
       String mergeFileName = folderDetails.getMergeFileName();
       if (null != mergeFileName) {
-        String mergeIndexPath =
-            fileStore.getTablePath() + entry.getKey() + 
CarbonCommonConstants.FILE_SEPARATOR
-                + mergeFileName;
+        String mergeIndexPath;
+        if (entry.getValue().isRelative()) {
+          mergeIndexPath =
+              fileStore.getTablePath() + entry.getKey() + 
CarbonCommonConstants.FILE_SEPARATOR
+                  + mergeFileName;
+        } else {
+          mergeIndexPath = entry.getKey() + 
CarbonCommonConstants.FILE_SEPARATOR + mergeFileName;
+        }
         carbonIndexSize += FileFactory.getCarbonFile(mergeIndexPath).getSize();
       }
       for (String indexFile : carbonindexFiles) {
-        String indexPath =
-            fileStore.getTablePath() + entry.getKey() + 
CarbonCommonConstants.FILE_SEPARATOR
-                + indexFile;
+        String indexPath;
+        if (entry.getValue().isRelative()) {
+          indexPath =
+              fileStore.getTablePath() + entry.getKey() + 
CarbonCommonConstants.FILE_SEPARATOR
+                  + indexFile;
+        } else {
+          indexPath = entry.getKey() + CarbonCommonConstants.FILE_SEPARATOR + 
indexFile;
+        }
         carbonIndexSize += FileFactory.getCarbonFile(indexPath).getSize();
       }
     }
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index c40b065..31f22a7 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -17,13 +17,16 @@
 package org.apache.carbondata.mv.rewrite
 
 import java.io.File
+import java.nio.file.{Files, Paths}
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
@@ -1242,6 +1245,117 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table IF EXISTS maintable")
   }
 
+  test("test create datamap with add segment") {
+    sql("drop table if exists fact_table_addseg")
+    sql("drop table if exists fact_table_addseg1")
+    sql(
+      """
+        | CREATE TABLE fact_table_addseg (empname String, designation String, 
doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table_addseg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql(
+      """
+        | CREATE TABLE fact_table_addseg1 (empname String, designation String, 
doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table_addseg1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql("drop datamap if exists datamap_addseg")
+    sql("create datamap datamap_addseg using 'mv' as select empname, 
designation from fact_table_addseg")
+    val df = sql("select empname,designation from fact_table_addseg")
+    val analyzed = df.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_addseg"))
+    assert(df.collect().length == 90)
+    val table = CarbonEnv.getCarbonTable(None, "fact_table_addseg1") 
(sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "0")
+    val newPath = storeLocation + "/" + "addsegtest"
+    copy(path, newPath)
+
+    sql(s"alter table fact_table_addseg add segment options('path'='$newPath', 
'format'='carbon')").show()
+    sql("select empname,designation from fact_table_addseg").show()
+    val df1 = sql("select empname,designation from fact_table_addseg")
+    val analyzed1 = df1.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed1, "datamap_addseg"))
+    assert(df1.collect().length == 180)
+    sql(s"drop datamap datamap_addseg")
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+    sql("drop table if exists fact_table_addseg")
+    sql("drop table if exists fact_table_addseg1")
+  }
+
+  test("test create datamap with add segment with deffered rebuild") {
+    sql("drop table if exists fact_table_addseg")
+    sql("drop table if exists fact_table_addseg1")
+    sql(
+      """
+        | CREATE TABLE fact_table_addseg (empname String, designation String, 
doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table_addseg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql(
+      """
+        | CREATE TABLE fact_table_addseg1 (empname String, designation String, 
doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table_addseg1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql("drop datamap if exists datamap_addseg")
+    sql("create datamap datamap_addseg using 'mv' WITH DEFERRED REBUILD as 
select empname, designation from fact_table_addseg")
+    sql("rebuild datamap datamap_addseg")
+    val df = sql("select empname,designation from fact_table_addseg")
+    val analyzed = df.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_addseg"))
+    assert(df.collect().length == 90)
+    val table = CarbonEnv.getCarbonTable(None, "fact_table_addseg1") 
(sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "0")
+    val newPath = storeLocation + "/" + "addsegtest"
+    copy(path, newPath)
+
+    sql(s"alter table fact_table_addseg add segment options('path'='$newPath', 
'format'='carbon')").show()
+    val df1 = sql("select empname,designation from fact_table_addseg")
+    val analyzed1 = df1.queryExecution.analyzed
+    assert(!TestUtil.verifyMVDataMap(analyzed1, "datamap_addseg"))
+    assert(df1.collect().length == 180)
+
+    sql("rebuild datamap datamap_addseg")
+
+    val df2 = sql("select empname,designation from fact_table_addseg")
+    val analyzed2 = df2.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed2, "datamap_addseg"))
+    assert(df2.collect().length == 180)
+
+    sql(s"drop datamap datamap_addseg")
+    sql("drop table if exists fact_table_addseg")
+    sql("drop table if exists fact_table_addseg1")
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+  }
+
+
+
+  def copy(oldLoc: String, newLoc: String): Unit = {
+    val oldFolder = FileFactory.getCarbonFile(oldLoc)
+    FileFactory.mkdirs(newLoc, FileFactory.getConfiguration)
+    val oldFiles = oldFolder.listFiles
+    for (file <- oldFiles) {
+      Files.copy(Paths.get(file.getParentFile.getPath, file.getName), 
Paths.get(newLoc, file.getName))
+    }
+  }
+
   override def afterAll {
     drop()
     CarbonProperties.getInstance()
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
new file mode 100644
index 0000000..50927ee
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.addsegment
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    dropTable
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
+
+  }
+
+  test("Test add segment ") {
+    sql("drop table if exists addsegment1")
+    sql(
+      """
+        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        |  utilization int,salary int, empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql("select count(*) from addsegment1").show()
+    val table = CarbonEnv.getCarbonTable(None, "addsegment1") 
(sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
+    val newPath = storeLocation + "/" + "addsegtest"
+    copy(path, newPath)
+    sql("delete from table addsegment1 where segment.id in (1)")
+    sql("clean files for table addsegment1")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+
+    sql(s"alter table addsegment1 add segment options('path'='$newPath', 
'format'='carbon')").show()
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
+    checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+  }
+
+  test("Test added segment drop") {
+    sql("drop table if exists addsegment1")
+    sql(
+      """
+        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        |  utilization int,salary int, empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql("select count(*) from addsegment1").show()
+    val table = CarbonEnv.getCarbonTable(None, "addsegment1") 
(sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
+    val newPath = storeLocation + "/" + "addsegtest"
+    copy(path, newPath)
+    sql("delete from table addsegment1 where segment.id in (1)")
+    sql("clean files for table addsegment1")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+
+    sql(s"alter table addsegment1 add segment options('path'='$newPath', 
'format'='carbon')").show()
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
+    checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
+    sql("delete from table addsegment1 where segment.id in (2)")
+    sql("clean files for table addsegment1")
+    val oldFolder = FileFactory.getCarbonFile(newPath)
+    assert(oldFolder.listFiles.length == 0, "Added segment path should be 
deleted when clean files are called")
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+  }
+
+  test("Test compact on added segment") {
+    sql("drop table if exists addsegment1")
+    sql(
+      """
+        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        |  utilization int,salary int, empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql("select count(*) from addsegment1").show()
+    val table = CarbonEnv.getCarbonTable(None, "addsegment1") 
(sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
+    val newPath = storeLocation + "/" + "addsegtest"
+    copy(path, newPath)
+    sql("delete from table addsegment1 where segment.id in (1)")
+    sql("clean files for table addsegment1")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+
+    sql(s"alter table addsegment1 add segment options('path'='$newPath', 
'format'='carbon')").show()
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
+    checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
+    sql("alter table addsegment1 compact 'major'").show()
+    checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
+    sql("clean files for table addsegment1")
+    val oldFolder = FileFactory.getCarbonFile(newPath)
+    assert(oldFolder.listFiles.length == 0, "Added segment path should be 
deleted when clean files are called")
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+  }
+
+  test("Test compact on multiple added segments") {
+    sql("drop table if exists addsegment1")
+    sql(
+      """
+        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        |  utilization int,salary int, empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql("select count(*) from addsegment1").show()
+    val table = CarbonEnv.getCarbonTable(None, "addsegment1") 
(sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
+    val newPath = storeLocation + "/" + "addsegtest"
+    for (i <- 0 until 10) {
+      copy(path, newPath+i)
+    }
+
+    sql("delete from table addsegment1 where segment.id in (1)")
+    sql("clean files for table addsegment1")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+    for (i <- 0 until 10) {
+      sql(s"alter table addsegment1 add segment options('path'='${newPath+i}', 
'format'='carbon')").show()
+
+    }
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(110)))
+    checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(110)))
+    sql("alter table addsegment1 compact 'minor'").show()
+    checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(110)))
+    sql("clean files for table addsegment1")
+    val oldFolder = FileFactory.getCarbonFile(newPath)
+    assert(oldFolder.listFiles.length == 0, "Added segment path should be 
deleted when clean files are called")
+    for (i <- 0 until 10) {
+      FileFactory.deleteAllFilesOfDir(new File(newPath+i))
+    }
+  }
+
+
+  test("Test update on added segment") {
+    sql("drop table if exists addsegment1")
+    sql(
+      """
+        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        |  utilization int,salary int, empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql("select count(*) from addsegment1").show()
+    val table = CarbonEnv.getCarbonTable(None, "addsegment1") 
(sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
+    val newPath = storeLocation + "/" + "addsegtest"
+    copy(path, newPath)
+    sql("delete from table addsegment1 where segment.id in (1)")
+    sql("clean files for table addsegment1")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+
+    sql(s"alter table addsegment1 add segment options('path'='$newPath', 
'format'='carbon')").show()
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
+    checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
+    sql("""update addsegment1 d  set (d.empname) = ('ravi') where d.empname = 
'arvind'""").show()
+    checkAnswer(sql("select count(*) from addsegment1 where empname='ravi'"), 
Seq(Row(2)))
+    checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(20)))
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+  }
+
+  test("Test validation on added segment") {
+    sql("drop table if exists addsegment1")
+    sql("drop table if exists addsegment2")
+    sql(
+      """
+        | CREATE TABLE addsegment1 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        |  utilization int,salary int, empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql(
+      """
+        | CREATE TABLE addsegment2 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql("select count(*) from addsegment1").show()
+    val table = CarbonEnv.getCarbonTable(None, "addsegment2") 
(sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "0")
+    val newPath = storeLocation + "/" + "addsegtest"
+    copy(path, newPath)
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+
+    val ex = intercept[Exception] {
+      sql(s"alter table addsegment1 add segment options('path'='$newPath', 
'format'='carbon')").show()
+    }
+    assert(ex.getMessage.contains("Schema is not same"))
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+  }
+
+  def copy(oldLoc: String, newLoc: String): Unit = {
+    val oldFolder = FileFactory.getCarbonFile(oldLoc)
+    FileFactory.mkdirs(newLoc, FileFactory.getConfiguration)
+    val oldFiles = oldFolder.listFiles
+    for (file <- oldFiles) {
+      Files.copy(Paths.get(file.getParentFile.getPath, file.getName), 
Paths.get(newLoc, file.getName))
+    }
+  }
+
+
+  override def afterAll = {
+    dropTable
+  }
+
+  def dropTable = {
+    sql("drop table if exists addsegment1")
+    sql("drop table if exists addsegment2")
+  }
+
+}
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 0544d22..f348da7 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -98,7 +98,7 @@ object CarbonStore {
               new java.sql.Timestamp(load.getLoadEndTime).toString
             }
 
-          val (dataSize, indexSize) = if (load.getFileFormat == 
FileFormat.ROW_V1) {
+          val (dataSize, indexSize) = if 
(load.getFileFormat.equals(FileFormat.ROW_V1)) {
             // for streaming segment, we should get the actual size from the 
index file
             // since it is continuously inserting data
             val segmentDir = CarbonTablePath.getSegmentPath(tablePath, 
load.getLoadName)
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 73aba46..cc7ba5a 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -146,7 +146,7 @@ class CarbonScanRDD[T: ClassTag](
       val streamSplits = new ArrayBuffer[InputSplit]()
       splits.asScala.foreach { split =>
         val carbonInputSplit = split.asInstanceOf[CarbonInputSplit]
-        if (FileFormat.ROW_V1 == carbonInputSplit.getFileFormat) {
+        if (FileFormat.ROW_V1.equals(carbonInputSplit.getFileFormat)) {
           streamSplits += split
         } else {
           columnarSplits.add(split)
@@ -428,8 +428,9 @@ class CarbonScanRDD[T: ClassTag](
       // one query id per table
       model.setQueryId(queryId)
       // get RecordReader by FileFormat
-      var reader: RecordReader[Void, Object] = inputSplit.getFileFormat match {
-        case FileFormat.ROW_V1 =>
+
+      var reader: RecordReader[Void, Object] =
+        if (inputSplit.getFileFormat.equals(FileFormat.ROW_V1)) {
           // create record reader for row format
           DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance())
           val inputFormat = new CarbonStreamInputFormat
@@ -441,7 +442,7 @@ class CarbonScanRDD[T: ClassTag](
           val streamReader = inputFormat.createRecordReader(inputSplit, 
attemptContext)
             .asInstanceOf[RecordReader[Void, Object]]
           streamReader
-        case _ =>
+        } else {
           // create record reader for CarbonData file format
           if (vectorReader) {
             model.setDirectVectorFill(directFill)
@@ -461,7 +462,7 @@ class CarbonScanRDD[T: ClassTag](
               format.getReadSupportClass(attemptContext.getConfiguration),
               inputMetricsStats, attemptContext.getConfiguration)
           }
-      }
+        }
 
       val closeReader = () => {
         if (reader != null) {
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index c331532..90dfc7c 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -191,6 +191,7 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
   protected val STREAMS = carbonKeyWord("STREAMS")
   protected val STMPROPERTIES = carbonKeyWord("STMPROPERTIES")
   protected val CARBONCLI = carbonKeyWord("CARBONCLI")
+  protected val PATH = carbonKeyWord("PATH")
 
   protected val doubleQuotedString = "\"([^\"]+)\"".r
   protected val singleQuotedString = "'([^']+)'".r
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 74c9140..b289045 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -618,7 +618,7 @@ object CarbonDataRDDFactory {
   /**
    * clear datamap files for segment
    */
-  private def clearDataMapFiles(carbonTable: CarbonTable, segmentId: String): 
Unit = {
+  def clearDataMapFiles(carbonTable: CarbonTable, segmentId: String): Unit = {
     try {
       val segments = List(new Segment(segmentId)).asJava
       DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
@@ -646,14 +646,16 @@ object CarbonDataRDDFactory {
       SegmentStatusManager.readTableStatusFile(
         CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
     val segmentFiles = segmentDetails.asScala.map { seg =>
-      val segmentFile =
-        
metadataDetails.find(_.getLoadName.equals(seg.getSegmentNo)).get.getSegmentFile
+      val load =
+        metadataDetails.find(_.getLoadName.equals(seg.getSegmentNo)).get
+      val segmentFile = load.getSegmentFile
       var segmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile]
 
       val file = SegmentFileStore.writeSegmentFile(
         carbonTable,
         seg.getSegmentNo,
-        String.valueOf(System.currentTimeMillis()))
+        String.valueOf(System.currentTimeMillis()),
+        load.getPath)
 
       if (segmentFile != null) {
         segmentFiles ++= FileFactory.getCarbonFile(
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
new file mode 100644
index 0000000..6bc122c
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.io.File
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import 
org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
+import org.apache.spark.sql.execution.command.{Checker, MetadataCommand}
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.types.StructType
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{FileFormat, 
LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, 
BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
+import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
 LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, 
LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.clearDataMapFiles
+
+
+/**
+ * User can add external data folder as a segment to a transactional table.
+ * In case of external carbon data folder user no need to specify the format 
in options. But for
+ * other formats like parquet user must specify the format=parquet in options.
+ */
+case class CarbonAddLoadCommand(
+    databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String])
+  extends MetadataCommand {
+
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    val relation = CarbonEnv
+      .getInstance(sparkSession)
+      .carbonMetaStore
+      .lookupRelation(databaseNameOp, tableName)(sparkSession)
+      .asInstanceOf[CarbonRelation]
+    val tableSchema = StructType.fromAttributes(relation.output)
+    val carbonTable = relation.carbonTable
+    setAuditTable(carbonTable)
+    if (!carbonTable.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
+    }
+
+    // if insert overwrite in progress, do not allow add segment
+    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+      throw new ConcurrentOperationException(carbonTable, "insert overwrite", 
"delete segment")
+    }
+    val segmentPath = options.getOrElse(
+      "path", throw new UnsupportedOperationException("PATH is manadatory"))
+
+    // TODO use the fileformat based on the format.
+    val segSchema = new SparkCarbonFileFormat().inferSchema(sparkSession, 
options, Seq.empty).get
+
+    if (!tableSchema.equals(segSchema)) {
+      throw new AnalysisException(s"Schema is not same. Table schema is : " +
+                                  s"${tableSchema} and segment schema is : 
${segSchema}")
+    }
+
+    val model = new CarbonLoadModel
+    model.setCarbonTransactionalTable(true)
+    model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+    model.setDatabaseName(carbonTable.getDatabaseName)
+    model.setTableName(carbonTable.getTableName)
+    val operationContext = new OperationContext
+    val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
+      new LoadTablePreExecutionEvent(
+        carbonTable.getCarbonTableIdentifier,
+        model)
+    operationContext.setProperty("isOverwrite", false)
+    OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, 
operationContext)
+    // Add pre event listener for index datamap
+    val tableDataMaps = 
DataMapStoreManager.getInstance().getAllDataMap(carbonTable)
+    val dataMapOperationContext = new OperationContext()
+    if (tableDataMaps.size() > 0) {
+      val dataMapNames: mutable.Buffer[String] =
+        tableDataMaps.asScala.map(dataMap => 
dataMap.getDataMapSchema.getDataMapName)
+      val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent =
+        new BuildDataMapPreExecutionEvent(sparkSession,
+          carbonTable.getAbsoluteTableIdentifier, dataMapNames)
+      
OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent,
+        dataMapOperationContext)
+    }
+
+    val newLoadMetaEntry = new LoadMetadataDetails
+    model.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime)
+    CarbonLoaderUtil.populateNewLoadMetaEntry(newLoadMetaEntry,
+      SegmentStatus.INSERT_IN_PROGRESS,
+      model.getFactTimeStamp,
+      false)
+    newLoadMetaEntry.setPath(segmentPath)
+    val format = options.getOrElse("format", "carbondata")
+    if (!(format.equals("carbondata") || format.equals("carbon"))) {
+      newLoadMetaEntry.setFileFormat(new FileFormat(format))
+    }
+
+    CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, 
false)
+    val segment = new Segment(model.getSegmentId,
+      SegmentFileStore.genSegmentFileName(
+        model.getSegmentId,
+        System.nanoTime().toString) + CarbonTablePath.SEGMENT_EXT,
+      segmentPath,
+      new util.HashMap[String, String](options.asJava))
+    val writeSegment =
+      SegmentFileStore.writeSegmentFile(carbonTable, segment)
+
+    operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
+      model.getSegmentId)
+    val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+      new LoadTablePreStatusUpdateEvent(
+        carbonTable.getCarbonTableIdentifier,
+        model)
+    
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, 
operationContext)
+
+    val success = if (writeSegment) {
+       SegmentFileStore.updateSegmentFile(
+        carbonTable,
+        model.getSegmentId,
+        segment.getSegmentFileName,
+        carbonTable.getCarbonTableIdentifier.getTableId,
+        new SegmentFileStore(carbonTable.getTablePath, 
segment.getSegmentFileName),
+        SegmentStatus.SUCCESS)
+    } else {
+      false
+    }
+
+    val postExecutionEvent = if (success) {
+      val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
+        new LoadTablePostStatusUpdateEvent(model)
+      val commitComplete = try {
+        OperationListenerBus.getInstance()
+          .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
+        true
+      } catch {
+        case ex: Exception =>
+          LOGGER.error("Problem while committing data maps", ex)
+          false
+      }
+      commitComplete
+    } else {
+      success
+    }
+
+    if (!postExecutionEvent || !success) {
+      CarbonLoaderUtil.updateTableStatusForFailure(model, 
"uniqueTableStatusId")
+      LOGGER.info("********starting clean up**********")
+      // delete segment is applicable for transactional table
+      CarbonLoaderUtil.deleteSegment(model, model.getSegmentId.toInt)
+      // delete corresponding segment file from metadata
+      val segmentFile = 
CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath) +
+                        File.separator + segment.getSegmentFileName
+      FileFactory.deleteFile(segmentFile, FileFactory.getFileType(segmentFile))
+      clearDataMapFiles(carbonTable, model.getSegmentId)
+      LOGGER.info("********clean up done**********")
+      LOGGER.error("Data load failed due to failure in table status updation.")
+      throw new Exception("Data load failed due to failure in table status 
updation.")
+    }
+    DataMapStatusManager.disableAllLazyDataMaps(carbonTable)
+    val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
+      new LoadTablePostExecutionEvent(
+        carbonTable.getCarbonTableIdentifier,
+        model)
+    OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, 
operationContext)
+    if (tableDataMaps.size() > 0) {
+      val buildDataMapPostExecutionEvent = 
BuildDataMapPostExecutionEvent(sparkSession,
+        carbonTable.getAbsoluteTableIdentifier, null, Seq(model.getSegmentId), 
false)
+      OperationListenerBus.getInstance()
+        .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
+    }
+    Seq.empty
+  }
+
+  override protected def opName: String = "ADD SEGMENT WITH PATH"
+}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index cb86cb5..08b0dd0 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -21,6 +21,7 @@ import java.util
 
 import scala.collection.JavaConverters._
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
@@ -40,7 +41,7 @@ import 
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, 
DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
 import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager, SegmentUpdateStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
@@ -130,14 +131,14 @@ object DeleteExecution {
           var result = List[(SegmentStatus, (SegmentUpdateDetails, 
ExecutionErrors, Long))]()
           while (records.hasNext) {
             val ((key), (rowCountDetailsVO, groupedRows)) = records.next
-            val segmentId = key.substring(0, 
key.indexOf(CarbonCommonConstants.FILE_SEPARATOR))
             result = result ++
                      deleteDeltaFunc(index,
                        key,
                        groupedRows.toIterator,
                        timestamp,
                        rowCountDetailsVO,
-                       isStandardTable)
+                       isStandardTable,
+                       metadataDetails)
           }
           result
         }).collect()
@@ -217,7 +218,8 @@ object DeleteExecution {
         iter: Iterator[Row],
         timestamp: String,
         rowCountDetailsVO: RowCountDetailsVO,
-        isStandardTable: Boolean
+        isStandardTable: Boolean,
+        loads: Array[LoadMetadataDetails]
     ): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, 
Long))] = {
 
       val result = new DeleteDelataResultImpl()
@@ -228,6 +230,7 @@ object DeleteExecution {
         .getBlockName(
           
CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)))
       val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0)
+      val load = loads.find(l => l.getLoadName.equalsIgnoreCase(segmentId)).get
       val deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new 
DeleteDeltaBlockDetails(blockName)
       val resultIter =
         new Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors, 
Long))] {
@@ -255,7 +258,11 @@ object DeleteExecution {
           }
 
           val blockPath =
-            CarbonUpdateUtil.getTableBlockPath(TID, tablePath, isStandardTable)
+            if (StringUtils.isNotEmpty(load.getPath)) {
+              load.getPath
+            } else {
+              CarbonUpdateUtil.getTableBlockPath(TID, tablePath, 
isStandardTable)
+            }
           val completeBlockName = CarbonTablePath
             .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, 
TupleIdEnum.BLOCK_ID) +
                                CarbonCommonConstants.FACT_FILE_EXT)
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 22548ff..8670b13 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -81,7 +81,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     cacheManagement | alterDataMap
 
   protected lazy val loadManagement: Parser[LogicalPlan] =
-    deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
+    deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew | 
addLoad
 
   protected lazy val restructure: Parser[LogicalPlan] =
     alterTableColumnRenameAndModifyDataType | alterTableDropColumn | 
alterTableAddColumns
@@ -482,6 +482,16 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         }
     }
 
+  /**
+   * ALTER TABLE <db.tableName> ADD SEGMENT 
OPTIONS('path'='path','''key'='value')
+   */
+  protected lazy val addLoad: Parser[LogicalPlan] =
+    ALTER ~ TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> SEGMENT) ~
+    (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")") <~ opt(";") ^^ {
+      case dbName ~ tableName ~ segment ~ optionsList =>
+        CarbonAddLoadCommand(dbName, tableName, optionsList.toMap)
+    }
+
   protected lazy val cleanFiles: Parser[LogicalPlan] =
     CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
       case databaseName ~ tableName =>
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 8cdb6af..408c961 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -46,6 +46,8 @@ public class CarbonDataLoadConfiguration {
 
   private BucketingInfo bucketingInfo;
 
+  private String segmentPath;
+
   private Map<String, Object> dataLoadProperties = new HashMap<>();
 
   /**
@@ -444,4 +446,12 @@ public class CarbonDataLoadConfiguration {
   public void setNumberOfLoadingCores(int numberOfLoadingCores) {
     this.numberOfLoadingCores = numberOfLoadingCores;
   }
+
+  public String getSegmentPath() {
+    return segmentPath;
+  }
+
+  public void setSegmentPath(String segmentPath) {
+    this.segmentPath = segmentPath;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 2cb3895..520943e 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -34,6 +34,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.util.CarbonProperties;
 import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
@@ -220,6 +221,16 @@ public final class DataLoadProcessBuilder {
     
configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
     configuration.setHeader(loadModel.getCsvHeaderColumns());
     configuration.setSegmentId(loadModel.getSegmentId());
+    List<LoadMetadataDetails> loadMetadataDetails = 
loadModel.getLoadMetadataDetails();
+    if (loadMetadataDetails != null) {
+      for (LoadMetadataDetails detail : loadMetadataDetails) {
+        if (detail.getLoadName().equals(loadModel.getSegmentId()) && 
StringUtils
+            .isNotEmpty(detail.getPath())) {
+          configuration.setSegmentPath(detail.getPath());
+        }
+      }
+    }
+
     configuration.setTaskNo(loadModel.getTaskNo());
     String[] complexDelimiters = new 
String[loadModel.getComplexDelimiters().size()];
     loadModel.getComplexDelimiters().toArray(complexDelimiters);
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 39a83d9..28c5fac 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -493,9 +493,13 @@ public class CarbonFactDataHandlerModel {
     if (!configuration.isCarbonTransactionalTable()) {
       carbonDataDirectoryPath = absoluteTableIdentifier.getTablePath();
     } else {
-      carbonDataDirectoryPath = CarbonTablePath
-          .getSegmentPath(absoluteTableIdentifier.getTablePath(),
-              configuration.getSegmentId() + "");
+      if (configuration.getSegmentPath() != null) {
+        carbonDataDirectoryPath = configuration.getSegmentPath();
+      } else {
+        carbonDataDirectoryPath = CarbonTablePath
+            .getSegmentPath(absoluteTableIdentifier.getTablePath(),
+                configuration.getSegmentId() + "");
+      }
     }
     CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
     return carbonDataDirectoryPath;
diff --git 
a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
 
b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index ba3d64a..8c3fd14 100644
--- 
a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ 
b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -86,7 +86,7 @@ public class StreamSegment {
                 CarbonTablePath.getMetadataPath(table.getTablePath()));
         LoadMetadataDetails streamSegment = null;
         for (LoadMetadataDetails detail : details) {
-          if (FileFormat.ROW_V1 == detail.getFileFormat()) {
+          if (FileFormat.ROW_V1.equals(detail.getFileFormat())) {
             if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
               streamSegment = detail;
               break;

Reply via email to