http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 2a69f0d..a4d3d2b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -272,7 +272,7 @@ public class CarbonCompactionUtil {
   public static CarbonTable getNextTableToCompact(CarbonTable[] carbonTables,
       List<CarbonTableIdentifier> skipList) {
     for (CarbonTable ctable : carbonTables) {
-      String metadataPath = ctable.getMetaDataFilepath();
+      String metadataPath = ctable.getMetadataPath();
       // check for the compaction required file and at the same time exclude 
the tables which are
       // present in the skip list.
       if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && 
!skipList

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index c141636..89326a3 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -169,15 +169,13 @@ public final class CarbonDataMergerUtil {
     // End Timestamp.
 
     // Table Update Status Metadata Update.
-    AbsoluteTableIdentifier absoluteTableIdentifier =
+    AbsoluteTableIdentifier identifier =
         
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
 
-    CarbonTablePath carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-
     SegmentUpdateStatusManager segmentUpdateStatusManager =
-        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+        new SegmentUpdateStatusManager(identifier);
 
-    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier);
 
     ICarbonLock updateLock = 
segmentUpdateStatusManager.getTableUpdateStatusLock();
     ICarbonLock statusLock = segmentStatusManager.getTableStatusLock();
@@ -224,7 +222,7 @@ public final class CarbonDataMergerUtil {
           }
 
           LoadMetadataDetails[] loadDetails =
-              segmentStatusManager.readLoadMetadata(metaDataFilepath);
+              SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
           for (LoadMetadataDetails loadDetail : loadDetails) {
             if (loadsToMerge.contains(loadDetail)) {
@@ -237,18 +235,18 @@ public final class CarbonDataMergerUtil {
             }
           }
 
-          segmentUpdateStatusManager
-              .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp);
-          segmentStatusManager
-              
.writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), 
loadDetails);
+          segmentUpdateStatusManager.writeLoadDetailsIntoFile(
+              Arrays.asList(updateLists), timestamp);
+          SegmentStatusManager.writeLoadDetailsIntoFile(
+              
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()), loadDetails);
           status = true;
         } else {
           LOGGER.error("Not able to acquire the lock.");
           status = false;
         }
       } catch (IOException e) {
-        LOGGER.error("Error while updating metadata. The metadata file path is 
" + carbonTablePath
-            .getMetadataDirectoryPath());
+        LOGGER.error("Error while updating metadata. The metadata file path is 
" +
+            CarbonTablePath.getMetadataPath(identifier.getTablePath()));
         status = false;
 
       } finally {
@@ -284,9 +282,9 @@ public final class CarbonDataMergerUtil {
       String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel 
carbonLoadModel,
       CompactionType compactionType, String segmentFile) throws IOException {
     boolean tableStatusUpdationStatus = false;
-    AbsoluteTableIdentifier absoluteTableIdentifier =
+    AbsoluteTableIdentifier identifier =
         
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier);
 
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
 
@@ -295,10 +293,7 @@ public final class CarbonDataMergerUtil {
         LOGGER.info("Acquired lock for the table " + 
carbonLoadModel.getDatabaseName() + "."
             + carbonLoadModel.getTableName() + " for table status updation ");
 
-        CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier);
-
-        String statusFilePath = carbonTablePath.getTableStatusFilePath();
+        String statusFilePath = 
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
 
         LoadMetadataDetails[] loadDetails = 
SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
@@ -617,8 +612,6 @@ public final class CarbonDataMergerUtil {
       // variable to store one  segment size across partition.
       long sizeOfOneSegmentAcrossPartition;
       if (segment.getSegmentFile() != null) {
-        CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(carbonTable.getTablePath(), 
carbonTable.getCarbonTableIdentifier());
         sizeOfOneSegmentAcrossPartition = CarbonUtil
             .getSizeOfSegment(carbonTablePath, new Segment(segId, 
segment.getSegmentFile()));
       } else {
@@ -662,35 +655,17 @@ public final class CarbonDataMergerUtil {
   /**
    * For calculating the size of the specified segment
    * @param tablePath the store path of the segment
-   * @param tableIdentifier identifier of table that the segment belong to
    * @param segId segment id
    * @return the data size of the segment
    */
-  private static long getSizeOfSegment(String tablePath,
-      CarbonTableIdentifier tableIdentifier, String segId) {
-    String loadPath = getStoreLocation(tablePath, tableIdentifier, segId);
+  private static long getSizeOfSegment(String tablePath, String segId) {
+    String loadPath = CarbonTablePath.getSegmentPath(tablePath, segId);
     CarbonFile segmentFolder =
         FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
     return getSizeOfFactFileInLoad(segmentFolder);
   }
 
   /**
-   * This method will get the store location for the given path, segemnt id 
and partition id
-   *
-   * @param tablePath
-   * @param carbonTableIdentifier identifier of catbon table that the segment 
belong to
-   * @param segmentId segment id
-   * @return the store location of the segment
-   */
-  private static String getStoreLocation(String tablePath,
-      CarbonTableIdentifier carbonTableIdentifier, String segmentId) {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier);
-    return carbonTablePath.getCarbonDataDirectoryPath(segmentId);
-  }
-
-
-  /**
    * Identify the segments to be merged based on the segment count
    *
    * @param listOfSegmentsAfterPreserve the list of segments after
@@ -1033,7 +1008,7 @@ public final class CarbonDataMergerUtil {
    * if UpdateDelta Files are more than IUD Compaction threshold.
    *
    * @param seg
-   * @param absoluteTableIdentifier
+   * @param identifier
    * @param segmentUpdateStatusManager
    * @param numberDeltaFilesThreshold
    * @return
@@ -1045,9 +1020,7 @@ public final class CarbonDataMergerUtil {
     CarbonFile[] updateDeltaFiles = null;
     Set<String> uniqueBlocks = new HashSet<String>();
 
-    CarbonTablePath carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-
-    String segmentPath = 
carbonTablePath.getCarbonDataDirectoryPath(seg.getSegmentNo());
+    String segmentPath = 
CarbonTablePath.getSegmentPath(absoluteTableIdentifier.getTablePath(), 
seg.getSegmentNo());
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, 
FileFactory.getFileType(segmentPath));
     CarbonFile[] allSegmentFiles = segDir.listFiles();
@@ -1295,15 +1268,12 @@ public final class CarbonDataMergerUtil {
     CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, 
timestamp, true);
 
     // Update the Table Status.
-    String metaDataFilepath = table.getMetaDataFilepath();
-    AbsoluteTableIdentifier absoluteTableIdentifier = 
table.getAbsoluteTableIdentifier();
-
-    CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier);
+    String metaDataFilepath = table.getMetadataPath();
+    AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
 
-    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+    String tableStatusPath = 
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
 
-    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier);
 
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
 
@@ -1317,7 +1287,7 @@ public final class CarbonDataMergerUtil {
                         + " for table status updation");
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-                segmentStatusManager.readLoadMetadata(metaDataFilepath);
+                SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
         for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
           if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
@@ -1326,7 +1296,7 @@ public final class CarbonDataMergerUtil {
           }
         }
         try {
-          segmentStatusManager
+          SegmentStatusManager
                   .writeLoadDetailsIntoFile(tableStatusPath, 
listOfLoadFolderDetailsArray);
         } catch (IOException e) {
           return false;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 732a7e8..5062a78 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -35,7 +35,6 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -337,9 +336,8 @@ public class CarbonFactDataHandlerModel {
       return configuration.getDataWritePath();
     }
     AbsoluteTableIdentifier absoluteTableIdentifier = 
configuration.getTableIdentifier();
-    CarbonTablePath carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-    String carbonDataDirectoryPath = carbonTablePath
-        .getCarbonDataDirectoryPath(configuration.getSegmentId() + "");
+    String carbonDataDirectoryPath = CarbonTablePath
+        .getSegmentPath(absoluteTableIdentifier.getTablePath(), 
configuration.getSegmentId() + "");
     CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
     return carbonDataDirectoryPath;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index e319160..2c08c18 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -44,7 +44,6 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -143,12 +142,9 @@ public final class CarbonDataProcessorUtil {
     String[] baseTmpStorePathArray = StringUtils.split(baseTempStorePath, 
File.pathSeparator);
     String[] localDataFolderLocArray = new 
String[baseTmpStorePathArray.length];
 
-    CarbonTable carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
     for (int i = 0 ; i < baseTmpStorePathArray.length; i++) {
       String tmpStore = baseTmpStorePathArray[i];
-      CarbonTablePath carbonTablePath =
-          CarbonStorePath.getCarbonTablePath(tmpStore, 
carbonTable.getCarbonTableIdentifier());
-      String carbonDataDirectoryPath = 
carbonTablePath.getCarbonDataDirectoryPath(segmentId);
+      String carbonDataDirectoryPath = 
CarbonTablePath.getSegmentPath(tmpStore, segmentId);
 
       localDataFolderLocArray[i] = carbonDataDirectoryPath + File.separator + 
taskId;
     }
@@ -376,16 +372,12 @@ public final class CarbonDataProcessorUtil {
    * @return data directory path
    */
   public static String createCarbonStoreLocation(String factStoreLocation,
-      String databaseName, String tableName, String partitionId, String 
segmentId) {
+      String databaseName, String tableName, String segmentId) {
     CarbonTable carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
-    CarbonTableIdentifier carbonTableIdentifier = 
carbonTable.getCarbonTableIdentifier();
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(factStoreLocation, 
carbonTableIdentifier);
-    String carbonDataDirectoryPath =
-        carbonTablePath.getCarbonDataDirectoryPath(segmentId);
-    return carbonDataDirectoryPath;
+    return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), 
segmentId);
   }
 
+
   /**
    * initialise data type for measures for their storage format
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index c135a88..e7c52f6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -46,7 +46,6 @@ import 
org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -55,7 +54,6 @@ import 
org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.merger.NodeBlockRelation;
@@ -74,11 +72,8 @@ public final class CarbonLoaderUtil {
   }
 
   public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) 
{
-    CarbonTable carbonTable = 
loadModel.getCarbonDataLoadSchema().getCarbonTable();
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(loadModel.getTablePath(), 
carbonTable.getCarbonTableIdentifier());
-
-    String segmentPath = 
carbonTablePath.getCarbonDataDirectoryPath(currentLoad + "");
+    String segmentPath = CarbonTablePath.getSegmentPath(
+        loadModel.getTablePath(), currentLoad + "");
     deleteStorePath(segmentPath);
   }
 
@@ -91,33 +86,26 @@ public final class CarbonLoaderUtil {
    */
   public static boolean isValidSegment(CarbonLoadModel loadModel,
       int currentLoad) {
-    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema()
-        .getCarbonTable();
-    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
-        loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
 
     int fileCount = 0;
-    int partitionCount = carbonTable.getPartitionCount();
-    for (int i = 0; i < partitionCount; i++) {
-      String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(
-          currentLoad + "");
-      CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
-          FileFactory.getFileType(segmentPath));
-      CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
-
-        @Override
-        public boolean accept(CarbonFile file) {
-          return file.getName().endsWith(
-              CarbonTablePath.getCarbonIndexExtension())
-              || file.getName().endsWith(
-              CarbonTablePath.getCarbonDataExtension());
-        }
-
-      });
-      fileCount += files.length;
-      if (files.length > 0) {
-        return true;
+    String segmentPath = CarbonTablePath.getSegmentPath(
+        loadModel.getTablePath(), currentLoad + "");
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
+        FileFactory.getFileType(segmentPath));
+    CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
+
+      @Override
+      public boolean accept(CarbonFile file) {
+        return file.getName().endsWith(
+            CarbonTablePath.getCarbonIndexExtension())
+            || file.getName().endsWith(
+            CarbonTablePath.getCarbonDataExtension());
       }
+
+    });
+    fileCount += files.length;
+    if (files.length > 0) {
+      return true;
     }
     if (fileCount == 0) {
       return false;
@@ -183,21 +171,20 @@ public final class CarbonLoaderUtil {
       CarbonLoadModel loadModel, boolean loadStartEntry, boolean 
insertOverwrite, String uuid,
       List<Segment> segmentsToBeDeleted, List<Segment> 
segmentFilesTobeUpdated) throws IOException {
     boolean status = false;
-    AbsoluteTableIdentifier absoluteTableIdentifier =
+    AbsoluteTableIdentifier identifier =
         
loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    CarbonTablePath carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-    String metadataPath = carbonTablePath.getMetadataDirectoryPath();
+    String metadataPath = 
CarbonTablePath.getMetadataPath(identifier.getTablePath());
     FileType fileType = FileFactory.getFileType(metadataPath);
     if (!FileFactory.isFileExist(metadataPath, fileType)) {
       FileFactory.mkdirs(metadataPath, fileType);
     }
     String tableStatusPath;
     if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() 
&& !uuid.isEmpty()) {
-      tableStatusPath = carbonTablePath.getTableStatusFilePathWithUUID(uuid);
+      tableStatusPath = CarbonTablePath.getTableStatusFilePathWithUUID(uuid);
     } else {
-      tableStatusPath = carbonTablePath.getTableStatusFilePath();
+      tableStatusPath = CarbonTablePath.getTableStatusFilePath();
     }
-    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier);
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
     int retryCount = CarbonLockUtil
         
.getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
@@ -211,7 +198,8 @@ public final class CarbonLoaderUtil {
             "Acquired lock for table" + loadModel.getDatabaseName() + "." + 
loadModel.getTableName()
                 + " for table status updation");
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-            
SegmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+            SegmentStatusManager.readLoadMetadata(
+                CarbonTablePath.getMetadataPath(identifier.getTablePath()));
         List<LoadMetadataDetails> listOfLoadFolderDetails =
             new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
         List<CarbonFile> staleFolders = new ArrayList<>();
@@ -238,12 +226,12 @@ public final class CarbonLoaderUtil {
           for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
             if (entry.getSegmentStatus() == 
SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
                 && SegmentStatusManager.isLoadInProgress(
-                    absoluteTableIdentifier, entry.getLoadName())) {
+                    identifier, entry.getLoadName())) {
               throw new RuntimeException("Already insert overwrite is in 
progress");
             } else if (newMetaEntry.getSegmentStatus() == 
SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
                 && entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
                 && SegmentStatusManager.isLoadInProgress(
-                    absoluteTableIdentifier, entry.getLoadName())) {
+                    identifier, entry.getLoadName())) {
               throw new RuntimeException("Already insert into or load is in 
progress");
             }
           }
@@ -268,7 +256,7 @@ public final class CarbonLoaderUtil {
                 entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
                 // For insert overwrite, we will delete the old segment folder 
immediately
                 // So collect the old segments here
-                addToStaleFolders(carbonTablePath, staleFolders, entry);
+                addToStaleFolders(identifier, staleFolders, entry);
               }
             }
           }
@@ -281,7 +269,7 @@ public final class CarbonLoaderUtil {
         // when no records are inserted then newSegmentEntry will be 
SegmentStatus.MARKED_FOR_DELETE
         // so empty segment folder should be deleted
         if (newMetaEntry.getSegmentStatus() == 
SegmentStatus.MARKED_FOR_DELETE) {
-          addToStaleFolders(carbonTablePath, staleFolders, newMetaEntry);
+          addToStaleFolders(identifier, staleFolders, newMetaEntry);
         }
 
         for (LoadMetadataDetails detail: listOfLoadFolderDetails) {
@@ -326,9 +314,10 @@ public final class CarbonLoaderUtil {
     return status;
   }
 
-  private static void addToStaleFolders(CarbonTablePath carbonTablePath,
+  private static void addToStaleFolders(AbsoluteTableIdentifier identifier,
       List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws 
IOException {
-    String path = 
carbonTablePath.getCarbonDataDirectoryPath(entry.getLoadName());
+    String path = CarbonTablePath.getSegmentPath(
+        identifier.getTablePath(), entry.getLoadName());
     // add to the deletion list only if file exist else HDFS file system will 
throw
     // exception while deleting the file if file path does not exist
     if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
@@ -354,11 +343,9 @@ public final class CarbonLoaderUtil {
     loadMetadataDetails.setLoadStartTime(loadStartTime);
   }
 
-  public static void writeLoadMetadata(AbsoluteTableIdentifier 
absoluteTableIdentifier,
+  public static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
       List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-    String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
+    String dataLoadLocation = 
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
 
     DataOutputStream dataOutputStream;
     Gson gsonObjectToWrite = new Gson();
@@ -906,10 +893,8 @@ public final class CarbonLoaderUtil {
    * This method will get the store location for the given path, segment id 
and partition id
    */
   public static void checkAndCreateCarbonDataLocation(String segmentId, 
CarbonTable carbonTable) {
-    CarbonTableIdentifier carbonTableIdentifier = 
carbonTable.getCarbonTableIdentifier();
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath(), 
carbonTableIdentifier);
-    String segmentFolder = 
carbonTablePath.getCarbonDataDirectoryPath(segmentId);
+    String segmentFolder = CarbonTablePath.getSegmentPath(
+        carbonTable.getTablePath(), segmentId);
     CarbonUtil.checkAndCreateFolder(segmentFolder);
   }
 
@@ -938,9 +923,7 @@ public final class CarbonLoaderUtil {
    */
   public static Long addDataIndexSizeIntoMetaEntry(LoadMetadataDetails 
loadMetadataDetails,
       String segmentId, CarbonTable carbonTable) throws IOException {
-    CarbonTablePath carbonTablePath =
-        
CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier()));
-    Map<String, Long> dataIndexSize = 
CarbonUtil.getDataSizeAndIndexSize(carbonTablePath,
+    Map<String, Long> dataIndexSize = 
CarbonUtil.getDataSizeAndIndexSize(carbonTable.getAbsoluteTableIdentifier(),
         new Segment(segmentId, loadMetadataDetails.getSegmentFile()));
     Long dataSize = 
dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
     loadMetadataDetails.setDataSize(String.valueOf(dataSize));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
index 288cd54..c00cc86 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 public final class DeleteLoadFolders {
@@ -50,15 +49,14 @@ public final class DeleteLoadFolders {
   /**
    * returns segment path
    *
-   * @param absoluteTableIdentifier
+   * @param identifier
    * @param oneLoad
    * @return
    */
-  private static String getSegmentPath(AbsoluteTableIdentifier 
absoluteTableIdentifier,
+  private static String getSegmentPath(AbsoluteTableIdentifier identifier,
       LoadMetadataDetails oneLoad) {
-    CarbonTablePath carbon = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
     String segmentId = oneLoad.getLoadName();
-    return carbon.getCarbonDataDirectoryPath(segmentId);
+    return CarbonTablePath.getSegmentPath(identifier.getTablePath(), 
segmentId);
   }
 
   public static void physicalFactAndMeasureMetadataDeletion(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git 
a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
 
b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index cd1e28a..d30891a 100644
--- 
a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ 
b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -63,30 +63,6 @@ public class BlockIndexStoreTest extends TestCase {
 
   }
 
-//  public void testLoadAndGetTaskIdToSegmentsMapForSingleSegment()
-//      throws IOException {
-//    File file = getPartFile();
-//    TableBlockInfo info =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { 
"loclhost" },
-//            file.length(), ColumnarFormatVersion.V1, null);
-//    CarbonTableIdentifier carbonTableIdentifier =
-//            new 
CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
-//    AbsoluteTableIdentifier absoluteTableIdentifier =
-//        new AbsoluteTableIdentifier("/src/test/resources", 
carbonTableIdentifier);
-//    try {
-//
-//      List<TableBlockUniqueIdentifier> tableBlockInfoList =
-//          getTableBlockUniqueIdentifierList(Arrays.asList(new 
TableBlockInfo[] { info }), absoluteTableIdentifier);
-//      List<AbstractIndex> loadAndGetBlocks = 
cache.getAll(tableBlockInfoList);
-//      assertTrue(loadAndGetBlocks.size() == 1);
-//    } catch (Exception e) {
-//      assertTrue(false);
-//    }
-//    List<String> segmentIds = new ArrayList<>();
-//      segmentIds.add(info.getSegment());
-//    cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
-//  }
-//
   private List<TableBlockUniqueIdentifier> 
getTableBlockUniqueIdentifierList(List<TableBlockInfo> tableBlockInfos,
       AbsoluteTableIdentifier absoluteTableIdentifier) {
     List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers = new 
ArrayList<>();
@@ -95,138 +71,6 @@ public class BlockIndexStoreTest extends TestCase {
     }
     return tableBlockUniqueIdentifiers;
   }
-//
-//  public void 
testloadAndGetTaskIdToSegmentsMapForSameBlockLoadedConcurrently()
-//      throws IOException {
-//    String canonicalPath =
-//        new File(this.getClass().getResource("/").getPath() + 
"/../../").getCanonicalPath();
-//    File file = getPartFile();
-//    TableBlockInfo info =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { 
"loclhost" },
-//            file.length(), ColumnarFormatVersion.V1, null);
-//    TableBlockInfo info1 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { 
"loclhost" },
-//            file.length(), ColumnarFormatVersion.V1, null);
-//
-//    TableBlockInfo info2 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-//            file.length(), ColumnarFormatVersion.V1, null);
-//    TableBlockInfo info3 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-//            file.length(), ColumnarFormatVersion.V1, null);
-//    TableBlockInfo info4 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-//            file.length(), ColumnarFormatVersion.V1, null);
-//
-//    CarbonTableIdentifier carbonTableIdentifier =
-//            new 
CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
-//    AbsoluteTableIdentifier absoluteTableIdentifier =
-//        new AbsoluteTableIdentifier("/src/test/resources", 
carbonTableIdentifier);
-//    ExecutorService executor = Executors.newFixedThreadPool(3);
-//    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] 
{ info, info1 }),
-//        absoluteTableIdentifier));
-//    executor.submit(
-//        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, 
info3, info4 }),
-//            absoluteTableIdentifier));
-//    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] 
{ info, info1 }),
-//        absoluteTableIdentifier));
-//    executor.submit(
-//        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, 
info3, info4 }),
-//            absoluteTableIdentifier));
-//    executor.shutdown();
-//    try {
-//      executor.awaitTermination(1, TimeUnit.DAYS);
-//    } catch (InterruptedException e) {
-//      e.printStackTrace();
-//    }
-//    List<TableBlockInfo> tableBlockInfos =
-//        Arrays.asList(new TableBlockInfo[] { info, info1, info2, info3, 
info4 });
-//    try {
-//      List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
-//          getTableBlockUniqueIdentifierList(tableBlockInfos, 
absoluteTableIdentifier);
-//      List<AbstractIndex> loadAndGetBlocks = 
cache.getAll(tableBlockUniqueIdentifiers);
-//      assertTrue(loadAndGetBlocks.size() == 5);
-//    } catch (Exception e) {
-//      assertTrue(false);
-//    }
-//    List<String> segmentIds = new ArrayList<>();
-//    for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
-//      segmentIds.add(tableBlockInfo.getSegment());
-//    }
-//    cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
-//  }
-//
-//  public void 
testloadAndGetTaskIdToSegmentsMapForDifferentSegmentLoadedConcurrently()
-//      throws IOException {
-//    String canonicalPath =
-//        new File(this.getClass().getResource("/").getPath() + 
"/../../").getCanonicalPath();
-//    File file = getPartFile();
-//    TableBlockInfo info =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { 
"loclhost" },
-//            file.length(), ColumnarFormatVersion.V3, null);
-//    TableBlockInfo info1 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { 
"loclhost" },
-//            file.length(), ColumnarFormatVersion.V3, null);
-//
-//    TableBlockInfo info2 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-//            file.length(), ColumnarFormatVersion.V3, null);
-//    TableBlockInfo info3 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-//            file.length(), ColumnarFormatVersion.V3, null);
-//    TableBlockInfo info4 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { 
"loclhost" },
-//            file.length(), ColumnarFormatVersion.V3, null);
-//
-//    TableBlockInfo info5 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { 
"loclhost" },
-//            file.length(),ColumnarFormatVersion.V3, null);
-//    TableBlockInfo info6 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { 
"loclhost" },
-//            file.length(), ColumnarFormatVersion.V3, null);
-//
-//    TableBlockInfo info7 =
-//        new TableBlockInfo(file.getAbsolutePath(), 0, "3", new String[] { 
"loclhost" },
-//            file.length(), ColumnarFormatVersion.V3, null);
-//
-//    CarbonTableIdentifier carbonTableIdentifier =
-//            new 
CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
-//    AbsoluteTableIdentifier absoluteTableIdentifier =
-//        new AbsoluteTableIdentifier("/src/test/resources", 
carbonTableIdentifier);
-//    ExecutorService executor = Executors.newFixedThreadPool(3);
-//    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] 
{ info, info1 }),
-//        absoluteTableIdentifier));
-//    executor.submit(
-//        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, 
info3, info4 }),
-//            absoluteTableIdentifier));
-//    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] 
{ info5, info6 }),
-//        absoluteTableIdentifier));
-//    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] 
{ info7 }),
-//        absoluteTableIdentifier));
-//
-//    executor.shutdown();
-//    try {
-//      executor.awaitTermination(1, TimeUnit.DAYS);
-//    } catch (InterruptedException e) {
-//      // TODO Auto-generated catch block
-//      e.printStackTrace();
-//    }
-//    List<TableBlockInfo> tableBlockInfos = Arrays
-//        .asList(new TableBlockInfo[] { info, info1, info2, info3, info4, 
info5, info6, info7 });
-//    try {
-//      List<TableBlockUniqueIdentifier> blockUniqueIdentifierList =
-//          getTableBlockUniqueIdentifierList(tableBlockInfos, 
absoluteTableIdentifier);
-//      List<AbstractIndex> loadAndGetBlocks = 
cache.getAll(blockUniqueIdentifierList);
-//      assertTrue(loadAndGetBlocks.size() == 8);
-//    } catch (Exception e) {
-//      assertTrue(false);
-//    }
-//    List<String> segmentIds = new ArrayList<>();
-//    for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
-//      segmentIds.add(tableBlockInfo.getSegment());
-//    }
-//    cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
-//  }
 
   private class BlockLoaderThread implements Callable<Void> {
     private List<TableBlockInfo> tableBlockInfoList;
@@ -248,7 +92,7 @@ public class BlockIndexStoreTest extends TestCase {
   }
 
   private static File getPartFile() {
-    String path = StoreCreator.getAbsoluteTableIdentifier().getTablePath()
+    String path = StoreCreator.getIdentifier().getTablePath()
         + "/Fact/Part0/Segment_0";
     File file = new File(path);
     File[] files = file.listFiles();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git 
a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java 
b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 7f0aef6..d42dcde 100644
--- 
a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ 
b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -64,7 +64,6 @@ import 
org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
 import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl;
@@ -98,14 +97,14 @@ import 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
  */
 public class StoreCreator {
 
-  private static AbsoluteTableIdentifier absoluteTableIdentifier;
+  private static AbsoluteTableIdentifier identifier;
   private static String storePath = "";
   static {
     try {
       storePath = new File("target/store").getCanonicalPath();
       String dbName = "testdb";
       String tableName = "testtable";
-      absoluteTableIdentifier =
+      identifier =
           AbsoluteTableIdentifier.from(
               storePath + "/testdb/testtable",
               new CarbonTableIdentifier(dbName, tableName, 
UUID.randomUUID().toString()));
@@ -114,8 +113,8 @@ public class StoreCreator {
     }
   }
 
-  public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
-    return absoluteTableIdentifier;
+  public static AbsoluteTableIdentifier getIdentifier() {
+    return identifier;
   }
 
   /**
@@ -134,12 +133,12 @@ public class StoreCreator {
       CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
       CarbonLoadModel loadModel = new CarbonLoadModel();
       loadModel.setCarbonDataLoadSchema(schema);
-      
loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
-      
loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
-      
loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+      
loadModel.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
+      
loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
+      
loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
       loadModel.setFactFilePath(factFilePath);
       loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
-      loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
+      loadModel.setTablePath(identifier.getTablePath());
       loadModel.setDateFormat(null);
       
loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
           CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
@@ -175,9 +174,9 @@ public class StoreCreator {
 
   private static CarbonTable createTable() throws IOException {
     TableInfo tableInfo = new TableInfo();
-    
tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
+    
tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
     TableSchema tableSchema = new TableSchema();
-    
tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+    
tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
     List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
     ArrayList<Encoding> encodings = new ArrayList<>();
     encodings.add(Encoding.DICTIONARY);
@@ -257,16 +256,13 @@ public class StoreCreator {
     tableSchema.setSchemaEvalution(schemaEvol);
     tableSchema.setTableId(UUID.randomUUID().toString());
     tableInfo.setTableUniqueName(
-        absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName()
+        identifier.getCarbonTableIdentifier().getTableUniqueName()
     );
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setFactTable(tableSchema);
-    tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
+    tableInfo.setTablePath(identifier.getTablePath());
 
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
-    String schemaFilePath = carbonTablePath.getSchemaFilePath();
+    String schemaFilePath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
     String schemaMetadataPath = 
CarbonTablePath.getFolderContainingFile(schemaFilePath);
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
 
@@ -329,7 +325,7 @@ public class StoreCreator {
       writer.close();
       writer.commit();
       Dictionary dict = (Dictionary) dictCache.get(
-          new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
+          new DictionaryColumnUniqueIdentifier(identifier,
                          columnIdentifier, dims.get(i).getDataType()));
       CarbonDictionarySortInfoPreparator preparator =
           new CarbonDictionarySortInfoPreparator();
@@ -444,7 +440,7 @@ public class StoreCreator {
     
loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
     listOfLoadFolderDetails.add(loadMetadataDetails);
 
-    String dataLoadLocation = schema.getCarbonTable().getMetaDataFilepath() + 
File.separator
+    String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + 
File.separator
         + CarbonCommonConstants.LOADMETADATA_FILENAME;
 
     DataOutputStream dataOutputStream;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
 
b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 7b823ac..8c9889d 100644
--- 
a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ 
b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -38,7 +38,6 @@ import org.apache.carbondata.core.statusmanager.FileFormat;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
 import org.apache.carbondata.format.BlockIndex;
@@ -60,8 +59,6 @@ public class StreamSegment {
    * get stream segment or create new stream segment if not exists
    */
   public static String open(CarbonTable table) throws IOException {
-    CarbonTablePath tablePath =
-        CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier());
     SegmentStatusManager segmentStatusManager =
         new SegmentStatusManager(table.getAbsoluteTableIdentifier());
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -72,7 +69,8 @@ public class StreamSegment {
                 + " for stream table get or create segment");
 
         LoadMetadataDetails[] details =
-            
SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
+            SegmentStatusManager.readLoadMetadata(
+                CarbonTablePath.getMetadataPath(table.getTablePath()));
         LoadMetadataDetails streamSegment = null;
         for (LoadMetadataDetails detail : details) {
           if (FileFormat.ROW_V1 == detail.getFileFormat()) {
@@ -97,8 +95,8 @@ public class StreamSegment {
             newDetails[i] = details[i];
           }
           newDetails[i] = newDetail;
-          SegmentStatusManager
-              .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), 
newDetails);
+          SegmentStatusManager.writeLoadDetailsIntoFile(
+              CarbonTablePath.getTableStatusFilePath(table.getTablePath()), 
newDetails);
           return newDetail.getLoadName();
         } else {
           return streamSegment.getLoadName();
@@ -126,8 +124,6 @@ public class StreamSegment {
    */
   public static String close(CarbonTable table, String segmentId)
       throws IOException {
-    CarbonTablePath tablePath =
-        CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier());
     SegmentStatusManager segmentStatusManager =
         new SegmentStatusManager(table.getAbsoluteTableIdentifier());
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -138,7 +134,8 @@ public class StreamSegment {
                 + " for stream table finish segment");
 
         LoadMetadataDetails[] details =
-            
SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
+            SegmentStatusManager.readLoadMetadata(
+                CarbonTablePath.getMetadataPath(table.getTablePath()));
         for (LoadMetadataDetails detail : details) {
           if (segmentId.equals(detail.getLoadName())) {
             detail.setLoadEndTime(System.currentTimeMillis());
@@ -162,7 +159,8 @@ public class StreamSegment {
         }
         newDetails[i] = newDetail;
         SegmentStatusManager
-            .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), 
newDetails);
+            .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(
+                table.getTablePath()), newDetails);
         return newDetail.getLoadName();
       } else {
         LOGGER.error(
@@ -192,7 +190,7 @@ public class StreamSegment {
     try {
       if (statusLock.lockWithRetries()) {
         LoadMetadataDetails[] details =
-            
SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath());
+            
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
         boolean updated = false;
         for (LoadMetadataDetails detail : details) {
           if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
@@ -202,10 +200,8 @@ public class StreamSegment {
           }
         }
         if (updated) {
-          CarbonTablePath tablePath =
-              
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
           SegmentStatusManager.writeLoadDetailsIntoFile(
-              tablePath.getTableStatusFilePath(),
+              
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()),
               details);
         }
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 578f0cd..aa5ead2 100644
--- 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -38,7 +38,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
@@ -223,7 +223,6 @@ object StreamHandoffRDD {
   ): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val identifier = carbonTable.getAbsoluteTableIdentifier
-    val tablePath = CarbonStorePath.getCarbonTablePath(identifier)
     var continueHandoff = false
     // require handoff lock on table
     val lock = CarbonLockFactory.getCarbonLockObj(identifier, 
LockUsage.HANDOFF_LOCK)
@@ -240,7 +239,7 @@ object StreamHandoffRDD {
           try {
             if (statusLock.lockWithRetries()) {
               loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
-                tablePath.getMetadataDirectoryPath)
+                CarbonTablePath.getMetadataPath(identifier.getTablePath))
             }
           } finally {
             if (null != statusLock) {
@@ -378,19 +377,16 @@ object StreamHandoffRDD {
       loadModel: CarbonLoadModel
   ): Boolean = {
     var status = false
-    val metaDataFilepath =
-      
loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath()
-    val identifier =
-      
loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier()
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
-    val metadataPath = carbonTablePath.getMetadataDirectoryPath()
+    val metaDataFilepath = 
loadModel.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath
+    val identifier = 
loadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier
+    val metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath)
     val fileType = FileFactory.getFileType(metadataPath)
     if (!FileFactory.isFileExist(metadataPath, fileType)) {
       FileFactory.mkdirs(metadataPath, fileType)
     }
-    val tableStatusPath = carbonTablePath.getTableStatusFilePath()
+    val tableStatusPath = 
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath)
     val segmentStatusManager = new SegmentStatusManager(identifier)
-    val carbonLock = segmentStatusManager.getTableStatusLock()
+    val carbonLock = segmentStatusManager.getTableStatusLock
     try {
       if (carbonLock.lockWithRetries()) {
         LOGGER.info(
@@ -424,7 +420,7 @@ object StreamHandoffRDD {
         status = true
       } else {
         LOGGER.error("Not able to acquire the lock for Table status updation 
for table " + loadModel
-          .getDatabaseName() + "." + loadModel.getTableName());
+          .getDatabaseName() + "." + loadModel.getTableName())
       }
     } finally {
       if (carbonLock.unlock()) {
@@ -435,6 +431,6 @@ object StreamHandoffRDD {
                      "." + loadModel.getTableName() + " during table status 
updation")
       }
     }
-    return status
+    status
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 75fcfb0..6316d84 100644
--- 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -31,7 +31,7 @@ import 
org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceP
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
 LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -127,16 +127,14 @@ object StreamSinkFactory {
    * @return
    */
   private def getStreamSegmentId(carbonTable: CarbonTable): String = {
-    val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val fileType = 
FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath)
-    if (!FileFactory.isFileExist(carbonTablePath.getMetadataDirectoryPath, 
fileType)) {
+    val segmentId = StreamSegment.open(carbonTable)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
segmentId)
+    val fileType = FileFactory.getFileType(segmentDir)
+    if (!FileFactory.isFileExist(segmentDir, fileType)) {
       // Create table directory path, in case of enabling hive metastore first 
load may not have
       // table folder created.
-      FileFactory.mkdirs(carbonTablePath.getMetadataDirectoryPath, fileType)
+      FileFactory.mkdirs(segmentDir, fileType)
     }
-    val segmentId = StreamSegment.open(carbonTable)
-    val segmentDir = carbonTablePath.getSegmentDir(segmentId)
     if (FileFactory.isFileExist(segmentDir, fileType)) {
       // recover fault
       StreamSegment.recoverSegmentIfRequired(segmentDir)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
 
b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 4f1c999..6e6d092 100644
--- 
a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -41,7 +41,7 @@ import 
org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.stats.QueryStatistic
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -64,9 +64,7 @@ class CarbonAppendableStreamSink(
     carbonLoadModel: CarbonLoadModel,
     server: Option[DictionaryServer]) extends Sink {
 
-  private val carbonTablePath = CarbonStorePath
-    .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-  private val fileLogPath = carbonTablePath.getStreamingLogDir
+  private val fileLogPath = 
CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)
   private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, 
sparkSession, fileLogPath)
   // prepare configuration
   private val hadoopConf = {
@@ -166,12 +164,12 @@ class CarbonAppendableStreamSink(
    * if the directory size of current segment beyond the threshold, hand off 
new segment
    */
   private def checkOrHandOffSegment(): Unit = {
-    val segmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
currentSegmentId)
     val fileType = FileFactory.getFileType(segmentDir)
     if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
       val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
       currentSegmentId = newSegmentId
-      val newSegmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
+      val newSegmentDir = 
CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
       FileFactory.mkdirs(newSegmentDir, fileType)
 
       // TODO trigger hand off operation
@@ -270,15 +268,13 @@ object CarbonAppendableStreamSink {
         }
 
         // update data file info in index file
-        val tablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-        StreamSegment.updateIndexFile(tablePath.getSegmentDir(segmentId))
+        StreamSegment.updateIndexFile(
+          CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId))
 
       } catch {
         // catch fault of executor side
         case t: Throwable =>
-          val tablePath =
-            
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-          val segmentDir = tablePath.getSegmentDir(segmentId)
+          val segmentDir = 
CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
           StreamSegment.recoverSegmentIfRequired(segmentDir)
           LOGGER.error(t, s"Aborting job ${ job.getJobID }.")
           committer.abortJob(job)

Reply via email to