http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index a8db6c9..bbc3697 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -34,7 +34,6 @@ import 
org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -56,43 +55,39 @@ public class TableProcessingOperations {
    */
   public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       final boolean isCompactionFlow) throws IOException {
-    String metaDataLocation = carbonTable.getMetaDataFilepath();
+    String metaDataLocation = carbonTable.getMetadataPath();
     final LoadMetadataDetails[] details = 
SegmentStatusManager.readLoadMetadata(metaDataLocation);
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(carbonTable.getTablePath(), 
carbonTable.getCarbonTableIdentifier());
 
     //delete folder which metadata no exist in tablestatus
-    for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
-      String partitionPath = carbonTablePath.getPartitionDir();
-      FileFactory.FileType fileType = FileFactory.getFileType(partitionPath);
-      if (FileFactory.isFileExist(partitionPath, fileType)) {
-        CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, 
fileType);
-        CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
-          @Override public boolean accept(CarbonFile path) {
-            String segmentId =
-                
CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
-            boolean found = false;
-            for (int j = 0; j < details.length; j++) {
-              if (details[j].getLoadName().equals(segmentId)) {
-                found = true;
-                break;
-              }
-            }
-            return !found;
-          }
-        });
-        for (int k = 0; k < listFiles.length; k++) {
+    String partitionPath = 
CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
+    FileFactory.FileType fileType = FileFactory.getFileType(partitionPath);
+    if (FileFactory.isFileExist(partitionPath, fileType)) {
+      CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, 
fileType);
+      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile path) {
           String segmentId =
-              
CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + 
"/dummy");
-          if (isCompactionFlow) {
-            if (segmentId.contains(".")) {
-              CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
-            }
-          } else {
-            if (!segmentId.contains(".")) {
-              CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+              CarbonTablePath.DataFileUtil.getSegmentId(path.getAbsolutePath() 
+ "/dummy");
+          boolean found = false;
+          for (int j = 0; j < details.length; j++) {
+            if (details[j].getLoadName().equals(segmentId)) {
+              found = true;
+              break;
             }
           }
+          return !found;
+        }
+      });
+      for (int k = 0; k < listFiles.length; k++) {
+        String segmentId =
+            
CarbonTablePath.DataFileUtil.getSegmentId(listFiles[k].getAbsolutePath() + 
"/dummy");
+        if (isCompactionFlow) {
+          if (segmentId.contains(".")) {
+            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+          }
+        } else {
+          if (!segmentId.contains(".")) {
+            CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 4cd5014..193d192 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -34,8 +34,6 @@ import 
org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import 
org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -105,12 +103,11 @@ public class FieldEncoderFactory {
           ColumnIdentifier parentColumnIdentifier =
               new ColumnIdentifier(parentColumnTableRelation.getColumnId(), 
null,
                   dataField.getColumn().getDataType());
-          CarbonTablePath carbonTablePath =
-              CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
           AbsoluteTableIdentifier parentAbsoluteTableIdentifier =
               AbsoluteTableIdentifier.from(
-              CarbonUtil.getNewTablePath(carbonTablePath, 
parentTableIdentifier.getTableName()),
-              parentTableIdentifier);
+                  CarbonTablePath.getNewTablePath(
+                      absoluteTableIdentifier.getTablePath(), 
parentTableIdentifier.getTableName()),
+                  parentTableIdentifier);
           identifier = new 
DictionaryColumnUniqueIdentifier(parentAbsoluteTableIdentifier,
               parentColumnIdentifier, dataField.getColumn().getDataType());
           return new DictionaryFieldConverterImpl(dataField, cache, 
parentAbsoluteTableIdentifier,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index d3caa99..a08177a 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -19,10 +19,8 @@ package org.apache.carbondata.processing.merger;
 
 import java.util.List;
 
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
@@ -42,13 +40,11 @@ public abstract class AbstractResultProcessor {
   public abstract boolean execute(List<RawResultIterator> resultIteratorList);
 
   protected void setDataFileAttributesInModel(CarbonLoadModel loadModel,
-      CompactionType compactionType, CarbonTable carbonTable,
-      CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
+      CompactionType compactionType, CarbonFactDataHandlerModel 
carbonFactDataHandlerModel) {
     CarbonDataFileAttributes carbonDataFileAttributes;
     if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
       long taskNo = 
CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
-          CarbonStorePath.getCarbonTablePath(loadModel.getTablePath(),
-              carbonTable.getCarbonTableIdentifier()));
+          loadModel.getTablePath());
       // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new 
file will
       // be written in same segment. So the TaskNo should be incremented by 1 
from max val.
       long index = taskNo + 1;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 2a69f0d..a4d3d2b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -272,7 +272,7 @@ public class CarbonCompactionUtil {
   public static CarbonTable getNextTableToCompact(CarbonTable[] carbonTables,
       List<CarbonTableIdentifier> skipList) {
     for (CarbonTable ctable : carbonTables) {
-      String metadataPath = ctable.getMetaDataFilepath();
+      String metadataPath = ctable.getMetadataPath();
       // check for the compaction required file and at the same time exclude 
the tables which are
       // present in the skip list.
       if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && 
!skipList

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 0eadc7f..c43dbf9 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -31,7 +31,6 @@ import 
org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
@@ -42,7 +41,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -167,15 +165,13 @@ public final class CarbonDataMergerUtil {
     // End Timestamp.
 
     // Table Update Status Metadata Update.
-    AbsoluteTableIdentifier absoluteTableIdentifier =
+    AbsoluteTableIdentifier identifier =
         
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
 
-    CarbonTablePath carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-
     SegmentUpdateStatusManager segmentUpdateStatusManager =
-        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+        new SegmentUpdateStatusManager(identifier);
 
-    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier);
 
     ICarbonLock updateLock = 
segmentUpdateStatusManager.getTableUpdateStatusLock();
     ICarbonLock statusLock = segmentStatusManager.getTableStatusLock();
@@ -222,7 +218,7 @@ public final class CarbonDataMergerUtil {
           }
 
           LoadMetadataDetails[] loadDetails =
-              segmentStatusManager.readLoadMetadata(metaDataFilepath);
+              SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
           for (LoadMetadataDetails loadDetail : loadDetails) {
             if (loadsToMerge.contains(loadDetail)) {
@@ -235,18 +231,18 @@ public final class CarbonDataMergerUtil {
             }
           }
 
-          segmentUpdateStatusManager
-              .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp);
-          segmentStatusManager
-              
.writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), 
loadDetails);
+          segmentUpdateStatusManager.writeLoadDetailsIntoFile(
+              Arrays.asList(updateLists), timestamp);
+          SegmentStatusManager.writeLoadDetailsIntoFile(
+              
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()), loadDetails);
           status = true;
         } else {
           LOGGER.error("Not able to acquire the lock.");
           status = false;
         }
       } catch (IOException e) {
-        LOGGER.error("Error while updating metadata. The metadata file path is 
" + carbonTablePath
-            .getMetadataDirectoryPath());
+        LOGGER.error("Error while updating metadata. The metadata file path is 
" +
+            CarbonTablePath.getMetadataPath(identifier.getTablePath()));
         status = false;
 
       } finally {
@@ -282,9 +278,9 @@ public final class CarbonDataMergerUtil {
       String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel 
carbonLoadModel,
       CompactionType compactionType) throws IOException {
     boolean tableStatusUpdationStatus = false;
-    AbsoluteTableIdentifier absoluteTableIdentifier =
+    AbsoluteTableIdentifier identifier =
         
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier);
 
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
 
@@ -293,10 +289,7 @@ public final class CarbonDataMergerUtil {
         LOGGER.info("Acquired lock for the table " + 
carbonLoadModel.getDatabaseName() + "."
             + carbonLoadModel.getTableName() + " for table status updation ");
 
-        CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier);
-
-        String statusFilePath = carbonTablePath.getTableStatusFilePath();
+        String statusFilePath = 
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
 
         LoadMetadataDetails[] loadDetails = 
SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
@@ -595,10 +588,6 @@ public final class CarbonDataMergerUtil {
     List<LoadMetadataDetails> segmentsToBeMerged =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
-    CarbonTableIdentifier tableIdentifier =
-        
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
-
-
     // total length
     long totalLength = 0;
 
@@ -613,7 +602,7 @@ public final class CarbonDataMergerUtil {
       String segId = segment.getLoadName();
       // variable to store one  segment size across partition.
       long sizeOfOneSegmentAcrossPartition =
-          getSizeOfSegment(tablePath, tableIdentifier, segId);
+          getSizeOfSegment(tablePath, segId);
 
       // if size of a segment is greater than the Major compaction size. then 
ignore it.
       if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
@@ -652,35 +641,17 @@ public final class CarbonDataMergerUtil {
   /**
    * For calculating the size of the specified segment
    * @param tablePath the store path of the segment
-   * @param tableIdentifier identifier of table that the segment belong to
    * @param segId segment id
    * @return the data size of the segment
    */
-  private static long getSizeOfSegment(String tablePath,
-      CarbonTableIdentifier tableIdentifier, String segId) {
-    String loadPath = getStoreLocation(tablePath, tableIdentifier, segId);
+  private static long getSizeOfSegment(String tablePath, String segId) {
+    String loadPath = CarbonTablePath.getSegmentPath(tablePath, segId);
     CarbonFile segmentFolder =
         FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
     return getSizeOfFactFileInLoad(segmentFolder);
   }
 
   /**
-   * This method will get the store location for the given path, segemnt id 
and partition id
-   *
-   * @param tablePath
-   * @param carbonTableIdentifier identifier of catbon table that the segment 
belong to
-   * @param segmentId segment id
-   * @return the store location of the segment
-   */
-  private static String getStoreLocation(String tablePath,
-      CarbonTableIdentifier carbonTableIdentifier, String segmentId) {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier);
-    return carbonTablePath.getCarbonDataDirectoryPath(segmentId);
-  }
-
-
-  /**
    * Identify the segments to be merged based on the segment count
    *
    * @param listOfSegmentsAfterPreserve the list of segments after
@@ -1022,21 +993,19 @@ public final class CarbonDataMergerUtil {
    * if UpdateDelta Files are more than IUD Compaction threshold.
    *
    * @param seg
-   * @param absoluteTableIdentifier
+   * @param identifier
    * @param segmentUpdateStatusManager
    * @param numberDeltaFilesThreshold
    * @return
    */
   public static Boolean checkUpdateDeltaFilesInSeg(String seg,
-      AbsoluteTableIdentifier absoluteTableIdentifier,
+      AbsoluteTableIdentifier identifier,
       SegmentUpdateStatusManager segmentUpdateStatusManager, int 
numberDeltaFilesThreshold) {
 
     CarbonFile[] updateDeltaFiles = null;
     Set<String> uniqueBlocks = new HashSet<String>();
 
-    CarbonTablePath carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(seg);
+    String segmentPath = 
CarbonTablePath.getSegmentPath(identifier.getTablePath(), seg);
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, 
FileFactory.getFileType(segmentPath));
     CarbonFile[] allSegmentFiles = segDir.listFiles();
@@ -1282,15 +1251,12 @@ public final class CarbonDataMergerUtil {
     CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, 
timestamp, true);
 
     // Update the Table Status.
-    String metaDataFilepath = table.getMetaDataFilepath();
-    AbsoluteTableIdentifier absoluteTableIdentifier = 
table.getAbsoluteTableIdentifier();
-
-    CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier);
+    String metaDataFilepath = table.getMetadataPath();
+    AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
 
-    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+    String tableStatusPath = 
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
 
-    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier);
 
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
 
@@ -1304,7 +1270,7 @@ public final class CarbonDataMergerUtil {
                         + " for table status updation");
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-                segmentStatusManager.readLoadMetadata(metaDataFilepath);
+                SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
         for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
           if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
@@ -1313,7 +1279,7 @@ public final class CarbonDataMergerUtil {
           }
         }
         try {
-          segmentStatusManager
+          SegmentStatusManager
                   .writeLoadDetailsIntoFile(tableStatusPath, 
listOfLoadFolderDetailsArray);
         } catch (IOException e) {
           return false;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index ff65db2..8fc6e66 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -404,8 +404,7 @@ public class CompactionResultSortProcessor extends 
AbstractResultProcessor {
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = 
CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, 
segmentProperties, tableName,
             tempStoreLocation);
-    setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonTable,
-        carbonFactDataHandlerModel);
+    setDataFileAttributesInModel(carbonLoadModel, compactionType, 
carbonFactDataHandlerModel);
     dataHandler = 
CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel,
         CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 3d0700b..6f506b1 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -72,8 +72,7 @@ public class RowResultMergerProcessor extends 
AbstractResultProcessor {
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = 
CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, 
tableName,
             tempStoreLocation);
-    setDataFileAttributesInModel(loadModel, compactionType, carbonTable,
-        carbonFactDataHandlerModel);
+    setDataFileAttributesInModel(loadModel, compactionType, 
carbonFactDataHandlerModel);
     carbonFactDataHandlerModel.setCompactionFlow(true);
     dataHandler = new 
CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 9f3c86f..bc87823 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -35,7 +35,6 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -308,8 +307,7 @@ public class CarbonFactDataHandlerModel {
     }
     carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
     String carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), 
loadModel.getDatabaseName(),
-            tableName, loadModel.getSegmentId());
+        .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), 
loadModel.getSegmentId());
     
carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     List<CarbonDimension> dimensionByTableName = 
carbonTable.getDimensionByTableName(tableName);
     boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
@@ -334,10 +332,9 @@ public class CarbonFactDataHandlerModel {
    * @return data directory path
    */
   private static String 
getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration) {
-    AbsoluteTableIdentifier absoluteTableIdentifier = 
configuration.getTableIdentifier();
-    CarbonTablePath carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
+    AbsoluteTableIdentifier identifier = configuration.getTableIdentifier();
     String carbonDataDirectoryPath =
-        
carbonTablePath.getCarbonDataDirectoryPath(configuration.getSegmentId());
+        CarbonTablePath.getSegmentPath(identifier.getTablePath(), 
configuration.getSegmentId());
     CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
     return carbonDataDirectoryPath;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index cfe6e31..ccde9e1 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -44,7 +43,6 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -143,12 +141,9 @@ public final class CarbonDataProcessorUtil {
     String[] baseTmpStorePathArray = StringUtils.split(baseTempStorePath, 
File.pathSeparator);
     String[] localDataFolderLocArray = new 
String[baseTmpStorePathArray.length];
 
-    CarbonTable carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
     for (int i = 0 ; i < baseTmpStorePathArray.length; i++) {
       String tmpStore = baseTmpStorePathArray[i];
-      CarbonTablePath carbonTablePath =
-          CarbonStorePath.getCarbonTablePath(tmpStore, 
carbonTable.getCarbonTableIdentifier());
-      String carbonDataDirectoryPath = 
carbonTablePath.getCarbonDataDirectoryPath(segmentId);
+      String carbonDataDirectoryPath = 
CarbonTablePath.getSegmentPath(tmpStore, segmentId);
 
       localDataFolderLocArray[i] = carbonDataDirectoryPath + File.separator + 
taskId;
     }
@@ -375,12 +370,9 @@ public final class CarbonDataProcessorUtil {
    * @return data directory path
    */
   public static String checkAndCreateCarbonStoreLocation(String 
factStoreLocation,
-      String databaseName, String tableName, String segmentId) {
-    CarbonTable carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
-    CarbonTableIdentifier carbonTableIdentifier = 
carbonTable.getCarbonTableIdentifier();
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(factStoreLocation, 
carbonTableIdentifier);
-    String carbonDataDirectoryPath = 
carbonTablePath.getCarbonDataDirectoryPath(segmentId);
+      String segmentId) {
+    String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(
+        factStoreLocation, segmentId);
     CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
     return carbonDataDirectoryPath;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 083128d..e6a3323 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -45,7 +45,6 @@ import 
org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -54,7 +53,6 @@ import 
org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.merger.NodeBlockRelation;
@@ -73,11 +71,8 @@ public final class CarbonLoaderUtil {
   }
 
   public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) 
{
-    CarbonTable carbonTable = 
loadModel.getCarbonDataLoadSchema().getCarbonTable();
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(loadModel.getTablePath(), 
carbonTable.getCarbonTableIdentifier());
-
-    String segmentPath = 
carbonTablePath.getCarbonDataDirectoryPath(currentLoad + "");
+    String segmentPath = CarbonTablePath.getSegmentPath(
+        loadModel.getTablePath(), currentLoad + "");
     deleteStorePath(segmentPath);
   }
 
@@ -90,33 +85,26 @@ public final class CarbonLoaderUtil {
    */
   public static boolean isValidSegment(CarbonLoadModel loadModel,
       int currentLoad) {
-    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema()
-        .getCarbonTable();
-    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
-        loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
 
     int fileCount = 0;
-    int partitionCount = carbonTable.getPartitionCount();
-    for (int i = 0; i < partitionCount; i++) {
-      String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(
-          currentLoad + "");
-      CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
-          FileFactory.getFileType(segmentPath));
-      CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
-
-        @Override
-        public boolean accept(CarbonFile file) {
-          return file.getName().endsWith(
-              CarbonTablePath.getCarbonIndexExtension())
-              || file.getName().endsWith(
-              CarbonTablePath.getCarbonDataExtension());
-        }
-
-      });
-      fileCount += files.length;
-      if (files.length > 0) {
-        return true;
+    String segmentPath = CarbonTablePath.getSegmentPath(
+        loadModel.getTablePath(), currentLoad + "");
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
+        FileFactory.getFileType(segmentPath));
+    CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
+
+      @Override
+      public boolean accept(CarbonFile file) {
+        return file.getName().endsWith(
+            CarbonTablePath.getCarbonIndexExtension())
+            || file.getName().endsWith(
+            CarbonTablePath.getCarbonDataExtension());
       }
+
+    });
+    fileCount += files.length;
+    if (files.length > 0) {
+      return true;
     }
     if (fileCount == 0) {
       return false;
@@ -165,21 +153,20 @@ public final class CarbonLoaderUtil {
       CarbonLoadModel loadModel, boolean loadStartEntry, boolean 
insertOverwrite, String uuid)
       throws IOException {
     boolean status = false;
-    AbsoluteTableIdentifier absoluteTableIdentifier =
+    AbsoluteTableIdentifier identifier =
         
loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    CarbonTablePath carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-    String metadataPath = carbonTablePath.getMetadataDirectoryPath();
+    String metadataPath = 
CarbonTablePath.getMetadataPath(identifier.getTablePath());
     FileType fileType = FileFactory.getFileType(metadataPath);
     if (!FileFactory.isFileExist(metadataPath, fileType)) {
       FileFactory.mkdirs(metadataPath, fileType);
     }
     String tableStatusPath;
     if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() 
&& !uuid.isEmpty()) {
-      tableStatusPath = carbonTablePath.getTableStatusFilePathWithUUID(uuid);
+      tableStatusPath = CarbonTablePath.getTableStatusFilePathWithUUID(uuid);
     } else {
-      tableStatusPath = carbonTablePath.getTableStatusFilePath();
+      tableStatusPath = CarbonTablePath.getTableStatusFilePath();
     }
-    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier);
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
     int retryCount = CarbonLockUtil
         
.getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
@@ -193,7 +180,8 @@ public final class CarbonLoaderUtil {
             "Acquired lock for table" + loadModel.getDatabaseName() + "." + 
loadModel.getTableName()
                 + " for table status updation");
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-            
SegmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+            SegmentStatusManager.readLoadMetadata(
+                CarbonTablePath.getMetadataPath(identifier.getTablePath()));
         List<LoadMetadataDetails> listOfLoadFolderDetails =
             new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
         List<CarbonFile> staleFolders = new ArrayList<>();
@@ -220,12 +208,12 @@ public final class CarbonLoaderUtil {
           for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
             if (entry.getSegmentStatus() == 
SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
                 && SegmentStatusManager.isLoadInProgress(
-                    absoluteTableIdentifier, entry.getLoadName())) {
+                    identifier, entry.getLoadName())) {
               throw new RuntimeException("Already insert overwrite is in 
progress");
             } else if (newMetaEntry.getSegmentStatus() == 
SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
                 && entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
                 && SegmentStatusManager.isLoadInProgress(
-                    absoluteTableIdentifier, entry.getLoadName())) {
+                    identifier, entry.getLoadName())) {
               throw new RuntimeException("Already insert into or load is in 
progress");
             }
           }
@@ -248,7 +236,7 @@ public final class CarbonLoaderUtil {
                 entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
                 // For insert overwrite, we will delete the old segment folder 
immediately
                 // So collect the old segments here
-                addToStaleFolders(carbonTablePath, staleFolders, entry);
+                addToStaleFolders(identifier, staleFolders, entry);
               }
             }
           }
@@ -257,7 +245,7 @@ public final class CarbonLoaderUtil {
         // when no records are inserted then newSegmentEntry will be 
SegmentStatus.MARKED_FOR_DELETE
         // so empty segment folder should be deleted
         if (newMetaEntry.getSegmentStatus() == 
SegmentStatus.MARKED_FOR_DELETE) {
-          addToStaleFolders(carbonTablePath, staleFolders, newMetaEntry);
+          addToStaleFolders(identifier, staleFolders, newMetaEntry);
         }
 
         SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, 
listOfLoadFolderDetails
@@ -291,9 +279,10 @@ public final class CarbonLoaderUtil {
     return status;
   }
 
-  private static void addToStaleFolders(CarbonTablePath carbonTablePath,
+  private static void addToStaleFolders(AbsoluteTableIdentifier identifier,
       List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws 
IOException {
-    String path = 
carbonTablePath.getCarbonDataDirectoryPath(entry.getLoadName());
+    String path = CarbonTablePath.getSegmentPath(
+        identifier.getTablePath(), entry.getLoadName());
     // add to the deletion list only if file exist else HDFS file system will 
throw
     // exception while deleting the file if file path does not exist
     if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
@@ -319,11 +308,9 @@ public final class CarbonLoaderUtil {
     loadMetadataDetails.setLoadStartTime(loadStartTime);
   }
 
-  public static void writeLoadMetadata(AbsoluteTableIdentifier 
absoluteTableIdentifier,
+  public static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
       List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-    String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
+    String dataLoadLocation = 
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
 
     DataOutputStream dataOutputStream;
     Gson gsonObjectToWrite = new Gson();
@@ -871,10 +858,8 @@ public final class CarbonLoaderUtil {
    * This method will get the store location for the given path, segment id 
and partition id
    */
   public static void checkAndCreateCarbonDataLocation(String segmentId, 
CarbonTable carbonTable) {
-    CarbonTableIdentifier carbonTableIdentifier = 
carbonTable.getCarbonTableIdentifier();
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath(), 
carbonTableIdentifier);
-    String segmentFolder = 
carbonTablePath.getCarbonDataDirectoryPath(segmentId);
+    String segmentFolder = CarbonTablePath.getSegmentPath(
+        carbonTable.getTablePath(), segmentId);
     CarbonUtil.checkAndCreateFolder(segmentFolder);
   }
 
@@ -903,10 +888,8 @@ public final class CarbonLoaderUtil {
    */
   public static Long addDataIndexSizeIntoMetaEntry(LoadMetadataDetails 
loadMetadataDetails,
       String segmentId, CarbonTable carbonTable) throws IOException {
-    CarbonTablePath carbonTablePath =
-        
CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier()));
     Map<String, Long> dataIndexSize =
-        CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, segmentId);
+        CarbonUtil.getDataSizeAndIndexSize(carbonTable.getTablePath(), 
segmentId);
     Long dataSize = 
dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
     loadMetadataDetails.setDataSize(String.valueOf(dataSize));
     Long indexSize = 
dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
index f9f3e20..1fdce32 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -32,7 +32,6 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 public final class DeleteLoadFolders {
@@ -47,15 +46,14 @@ public final class DeleteLoadFolders {
   /**
    * returns segment path
    *
-   * @param absoluteTableIdentifier
+   * @param identifier
    * @param oneLoad
    * @return
    */
-  private static String getSegmentPath(AbsoluteTableIdentifier 
absoluteTableIdentifier,
+  private static String getSegmentPath(AbsoluteTableIdentifier identifier,
       LoadMetadataDetails oneLoad) {
-    CarbonTablePath carbon = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
     String segmentId = oneLoad.getLoadName();
-    return carbon.getCarbonDataDirectoryPath(segmentId);
+    return CarbonTablePath.getSegmentPath(identifier.getTablePath(), 
segmentId);
   }
 
   public static void physicalFactAndMeasureMetadataDeletion(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git 
a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
 
b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index 63320ef..53f1296 100644
--- 
a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ 
b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -71,12 +71,12 @@ public class BlockIndexStoreTest extends TestCase {
 //            file.length(), ColumnarFormatVersion.V1, null);
 //    CarbonTableIdentifier carbonTableIdentifier =
 //            new 
CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
-//    AbsoluteTableIdentifier absoluteTableIdentifier =
+//    AbsoluteTableIdentifier identifier =
 //        new AbsoluteTableIdentifier("/src/test/resources", 
carbonTableIdentifier);
 //    try {
 //
 //      List<TableBlockUniqueIdentifier> tableBlockInfoList =
-//          getTableBlockUniqueIdentifierList(Arrays.asList(new 
TableBlockInfo[] { info }), absoluteTableIdentifier);
+//          getTableBlockUniqueIdentifierList(Arrays.asList(new 
TableBlockInfo[] { info }), identifier);
 //      List<AbstractIndex> loadAndGetBlocks = 
cache.getAll(tableBlockInfoList);
 //      assertTrue(loadAndGetBlocks.size() == 1);
 //    } catch (Exception e) {
@@ -84,7 +84,7 @@ public class BlockIndexStoreTest extends TestCase {
 //    }
 //    List<String> segmentIds = new ArrayList<>();
 //      segmentIds.add(info.getSegmentId());
-//    cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
+//    cache.removeTableBlocks(segmentIds, identifier);
 //  }
 //
   private List<TableBlockUniqueIdentifier> 
getTableBlockUniqueIdentifierList(List<TableBlockInfo> tableBlockInfos,
@@ -120,19 +120,19 @@ public class BlockIndexStoreTest extends TestCase {
 //
 //    CarbonTableIdentifier carbonTableIdentifier =
 //            new 
CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
-//    AbsoluteTableIdentifier absoluteTableIdentifier =
+//    AbsoluteTableIdentifier identifier =
 //        new AbsoluteTableIdentifier("/src/test/resources", 
carbonTableIdentifier);
 //    ExecutorService executor = Executors.newFixedThreadPool(3);
 //    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] 
{ info, info1 }),
-//        absoluteTableIdentifier));
+//        identifier));
 //    executor.submit(
 //        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, 
info3, info4 }),
-//            absoluteTableIdentifier));
+//            identifier));
 //    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] 
{ info, info1 }),
-//        absoluteTableIdentifier));
+//        identifier));
 //    executor.submit(
 //        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, 
info3, info4 }),
-//            absoluteTableIdentifier));
+//            identifier));
 //    executor.shutdown();
 //    try {
 //      executor.awaitTermination(1, TimeUnit.DAYS);
@@ -143,7 +143,7 @@ public class BlockIndexStoreTest extends TestCase {
 //        Arrays.asList(new TableBlockInfo[] { info, info1, info2, info3, 
info4 });
 //    try {
 //      List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
-//          getTableBlockUniqueIdentifierList(tableBlockInfos, 
absoluteTableIdentifier);
+//          getTableBlockUniqueIdentifierList(tableBlockInfos, identifier);
 //      List<AbstractIndex> loadAndGetBlocks = 
cache.getAll(tableBlockUniqueIdentifiers);
 //      assertTrue(loadAndGetBlocks.size() == 5);
 //    } catch (Exception e) {
@@ -153,7 +153,7 @@ public class BlockIndexStoreTest extends TestCase {
 //    for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
 //      segmentIds.add(tableBlockInfo.getSegmentId());
 //    }
-//    cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
+//    cache.removeTableBlocks(segmentIds, identifier);
 //  }
 //
 //  public void 
testloadAndGetTaskIdToSegmentsMapForDifferentSegmentLoadedConcurrently()
@@ -191,18 +191,18 @@ public class BlockIndexStoreTest extends TestCase {
 //
 //    CarbonTableIdentifier carbonTableIdentifier =
 //            new 
CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
-//    AbsoluteTableIdentifier absoluteTableIdentifier =
+//    AbsoluteTableIdentifier identifier =
 //        new AbsoluteTableIdentifier("/src/test/resources", 
carbonTableIdentifier);
 //    ExecutorService executor = Executors.newFixedThreadPool(3);
 //    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] 
{ info, info1 }),
-//        absoluteTableIdentifier));
+//        identifier));
 //    executor.submit(
 //        new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, 
info3, info4 }),
-//            absoluteTableIdentifier));
+//            identifier));
 //    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] 
{ info5, info6 }),
-//        absoluteTableIdentifier));
+//        identifier));
 //    executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] 
{ info7 }),
-//        absoluteTableIdentifier));
+//        identifier));
 //
 //    executor.shutdown();
 //    try {
@@ -215,7 +215,7 @@ public class BlockIndexStoreTest extends TestCase {
 //        .asList(new TableBlockInfo[] { info, info1, info2, info3, info4, 
info5, info6, info7 });
 //    try {
 //      List<TableBlockUniqueIdentifier> blockUniqueIdentifierList =
-//          getTableBlockUniqueIdentifierList(tableBlockInfos, 
absoluteTableIdentifier);
+//          getTableBlockUniqueIdentifierList(tableBlockInfos, identifier);
 //      List<AbstractIndex> loadAndGetBlocks = 
cache.getAll(blockUniqueIdentifierList);
 //      assertTrue(loadAndGetBlocks.size() == 8);
 //    } catch (Exception e) {
@@ -225,7 +225,7 @@ public class BlockIndexStoreTest extends TestCase {
 //    for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
 //      segmentIds.add(tableBlockInfo.getSegmentId());
 //    }
-//    cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
+//    cache.removeTableBlocks(segmentIds, identifier);
 //  }
 
   private class BlockLoaderThread implements Callable<Void> {
@@ -248,7 +248,7 @@ public class BlockIndexStoreTest extends TestCase {
   }
 
   private static File getPartFile() {
-    String path = StoreCreator.getAbsoluteTableIdentifier().getTablePath()
+    String path = StoreCreator.getIdentifier().getTablePath()
         + "/Fact/Part0/Segment_0";
     File file = new File(path);
     File[] files = file.listFiles();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git 
a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java 
b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 7f0aef6..d42dcde 100644
--- 
a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ 
b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -64,7 +64,6 @@ import 
org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
 import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl;
@@ -98,14 +97,14 @@ import 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
  */
 public class StoreCreator {
 
-  private static AbsoluteTableIdentifier absoluteTableIdentifier;
+  private static AbsoluteTableIdentifier identifier;
   private static String storePath = "";
   static {
     try {
       storePath = new File("target/store").getCanonicalPath();
       String dbName = "testdb";
       String tableName = "testtable";
-      absoluteTableIdentifier =
+      identifier =
           AbsoluteTableIdentifier.from(
               storePath + "/testdb/testtable",
               new CarbonTableIdentifier(dbName, tableName, 
UUID.randomUUID().toString()));
@@ -114,8 +113,8 @@ public class StoreCreator {
     }
   }
 
-  public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
-    return absoluteTableIdentifier;
+  public static AbsoluteTableIdentifier getIdentifier() {
+    return identifier;
   }
 
   /**
@@ -134,12 +133,12 @@ public class StoreCreator {
       CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
       CarbonLoadModel loadModel = new CarbonLoadModel();
       loadModel.setCarbonDataLoadSchema(schema);
-      
loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
-      
loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
-      
loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+      
loadModel.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
+      
loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
+      
loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
       loadModel.setFactFilePath(factFilePath);
       loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
-      loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
+      loadModel.setTablePath(identifier.getTablePath());
       loadModel.setDateFormat(null);
       
loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
           CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
@@ -175,9 +174,9 @@ public class StoreCreator {
 
   private static CarbonTable createTable() throws IOException {
     TableInfo tableInfo = new TableInfo();
-    
tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
+    
tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
     TableSchema tableSchema = new TableSchema();
-    
tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+    
tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
     List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
     ArrayList<Encoding> encodings = new ArrayList<>();
     encodings.add(Encoding.DICTIONARY);
@@ -257,16 +256,13 @@ public class StoreCreator {
     tableSchema.setSchemaEvalution(schemaEvol);
     tableSchema.setTableId(UUID.randomUUID().toString());
     tableInfo.setTableUniqueName(
-        absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName()
+        identifier.getCarbonTableIdentifier().getTableUniqueName()
     );
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setFactTable(tableSchema);
-    tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
+    tableInfo.setTablePath(identifier.getTablePath());
 
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
-    String schemaFilePath = carbonTablePath.getSchemaFilePath();
+    String schemaFilePath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
     String schemaMetadataPath = 
CarbonTablePath.getFolderContainingFile(schemaFilePath);
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
 
@@ -329,7 +325,7 @@ public class StoreCreator {
       writer.close();
       writer.commit();
       Dictionary dict = (Dictionary) dictCache.get(
-          new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
+          new DictionaryColumnUniqueIdentifier(identifier,
                          columnIdentifier, dims.get(i).getDataType()));
       CarbonDictionarySortInfoPreparator preparator =
           new CarbonDictionarySortInfoPreparator();
@@ -444,7 +440,7 @@ public class StoreCreator {
     
loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
     listOfLoadFolderDetails.add(loadMetadataDetails);
 
-    String dataLoadLocation = schema.getCarbonTable().getMetaDataFilepath() + 
File.separator
+    String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + 
File.separator
         + CarbonCommonConstants.LOADMETADATA_FILENAME;
 
     DataOutputStream dataOutputStream;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
 
b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 7b823ac..8c9889d 100644
--- 
a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ 
b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -38,7 +38,6 @@ import org.apache.carbondata.core.statusmanager.FileFormat;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
 import org.apache.carbondata.format.BlockIndex;
@@ -60,8 +59,6 @@ public class StreamSegment {
    * get stream segment or create new stream segment if not exists
    */
   public static String open(CarbonTable table) throws IOException {
-    CarbonTablePath tablePath =
-        CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier());
     SegmentStatusManager segmentStatusManager =
         new SegmentStatusManager(table.getAbsoluteTableIdentifier());
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -72,7 +69,8 @@ public class StreamSegment {
                 + " for stream table get or create segment");
 
         LoadMetadataDetails[] details =
-            
SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
+            SegmentStatusManager.readLoadMetadata(
+                CarbonTablePath.getMetadataPath(table.getTablePath()));
         LoadMetadataDetails streamSegment = null;
         for (LoadMetadataDetails detail : details) {
           if (FileFormat.ROW_V1 == detail.getFileFormat()) {
@@ -97,8 +95,8 @@ public class StreamSegment {
             newDetails[i] = details[i];
           }
           newDetails[i] = newDetail;
-          SegmentStatusManager
-              .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), 
newDetails);
+          SegmentStatusManager.writeLoadDetailsIntoFile(
+              CarbonTablePath.getTableStatusFilePath(table.getTablePath()), 
newDetails);
           return newDetail.getLoadName();
         } else {
           return streamSegment.getLoadName();
@@ -126,8 +124,6 @@ public class StreamSegment {
    */
   public static String close(CarbonTable table, String segmentId)
       throws IOException {
-    CarbonTablePath tablePath =
-        CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier());
     SegmentStatusManager segmentStatusManager =
         new SegmentStatusManager(table.getAbsoluteTableIdentifier());
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -138,7 +134,8 @@ public class StreamSegment {
                 + " for stream table finish segment");
 
         LoadMetadataDetails[] details =
-            
SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
+            SegmentStatusManager.readLoadMetadata(
+                CarbonTablePath.getMetadataPath(table.getTablePath()));
         for (LoadMetadataDetails detail : details) {
           if (segmentId.equals(detail.getLoadName())) {
             detail.setLoadEndTime(System.currentTimeMillis());
@@ -162,7 +159,8 @@ public class StreamSegment {
         }
         newDetails[i] = newDetail;
         SegmentStatusManager
-            .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), 
newDetails);
+            .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(
+                table.getTablePath()), newDetails);
         return newDetail.getLoadName();
       } else {
         LOGGER.error(
@@ -192,7 +190,7 @@ public class StreamSegment {
     try {
       if (statusLock.lockWithRetries()) {
         LoadMetadataDetails[] details =
-            
SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath());
+            
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
         boolean updated = false;
         for (LoadMetadataDetails detail : details) {
           if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
@@ -202,10 +200,8 @@ public class StreamSegment {
           }
         }
         if (updated) {
-          CarbonTablePath tablePath =
-              
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
           SegmentStatusManager.writeLoadDetailsIntoFile(
-              tablePath.getTableStatusFilePath(),
+              
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()),
               details);
         }
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index ec6ab1a..5c6165d 100644
--- 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -37,7 +37,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, 
CarbonStreamRecordReader}
@@ -220,7 +220,6 @@ object StreamHandoffRDD {
   ): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val identifier = carbonTable.getAbsoluteTableIdentifier
-    val tablePath = CarbonStorePath.getCarbonTablePath(identifier)
     var continueHandoff = false
     // require handoff lock on table
     val lock = CarbonLockFactory.getCarbonLockObj(identifier, 
LockUsage.HANDOFF_LOCK)
@@ -237,7 +236,7 @@ object StreamHandoffRDD {
           try {
             if (statusLock.lockWithRetries()) {
               loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
-                tablePath.getMetadataDirectoryPath)
+                CarbonTablePath.getMetadataPath(identifier.getTablePath))
             }
           } finally {
             if (null != statusLock) {
@@ -359,19 +358,16 @@ object StreamHandoffRDD {
       loadModel: CarbonLoadModel
   ): Boolean = {
     var status = false
-    val metaDataFilepath =
-      
loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath()
-    val identifier =
-      
loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier()
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
-    val metadataPath = carbonTablePath.getMetadataDirectoryPath()
+    val metaDataFilepath = 
loadModel.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath
+    val identifier = 
loadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier
+    val metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath)
     val fileType = FileFactory.getFileType(metadataPath)
     if (!FileFactory.isFileExist(metadataPath, fileType)) {
       FileFactory.mkdirs(metadataPath, fileType)
     }
-    val tableStatusPath = carbonTablePath.getTableStatusFilePath()
+    val tableStatusPath = 
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath)
     val segmentStatusManager = new SegmentStatusManager(identifier)
-    val carbonLock = segmentStatusManager.getTableStatusLock()
+    val carbonLock = segmentStatusManager.getTableStatusLock
     try {
       if (carbonLock.lockWithRetries()) {
         LOGGER.info(
@@ -404,7 +400,7 @@ object StreamHandoffRDD {
         status = true
       } else {
         LOGGER.error("Not able to acquire the lock for Table status updation 
for table " + loadModel
-          .getDatabaseName() + "." + loadModel.getTableName());
+          .getDatabaseName() + "." + loadModel.getTableName())
       }
     } finally {
       if (carbonLock.unlock()) {
@@ -415,6 +411,6 @@ object StreamHandoffRDD {
                      "." + loadModel.getTableName() + " during table status 
updation")
       }
     }
-    return status
+    status
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index f2274be..c417fbe 100644
--- 
a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ 
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -31,7 +31,7 @@ import 
org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceP
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
 LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -126,16 +126,14 @@ object StreamSinkFactory {
    * @return
    */
   private def getStreamSegmentId(carbonTable: CarbonTable): String = {
-    val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val fileType = 
FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath)
-    if (!FileFactory.isFileExist(carbonTablePath.getMetadataDirectoryPath, 
fileType)) {
+    val segmentId = StreamSegment.open(carbonTable)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
segmentId)
+    val fileType = FileFactory.getFileType(segmentDir)
+    if (!FileFactory.isFileExist(segmentDir, fileType)) {
       // Create table directory path, in case of enabling hive metastore first 
load may not have
       // table folder created.
-      FileFactory.mkdirs(carbonTablePath.getMetadataDirectoryPath, fileType)
+      FileFactory.mkdirs(segmentDir, fileType)
     }
-    val segmentId = StreamSegment.open(carbonTable)
-    val segmentDir = carbonTablePath.getSegmentDir(segmentId)
     if (FileFactory.isFileExist(segmentDir, fileType)) {
       // recover fault
       StreamSegment.recoverSegmentIfRequired(segmentDir)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfa9a2c2/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
 
b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 45bc19a..ff483e5 100644
--- 
a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -40,7 +40,7 @@ import 
org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.stats.QueryStatistic
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -62,9 +62,7 @@ class CarbonAppendableStreamSink(
     carbonLoadModel: CarbonLoadModel,
     server: Option[DictionaryServer]) extends Sink {
 
-  private val carbonTablePath = CarbonStorePath
-    .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-  private val fileLogPath = carbonTablePath.getStreamingLogDir
+  private val fileLogPath = 
CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)
   private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, 
sparkSession, fileLogPath)
   // prepare configuration
   private val hadoopConf = {
@@ -149,12 +147,12 @@ class CarbonAppendableStreamSink(
    * if the directory size of current segment beyond the threshold, hand off 
new segment
    */
   private def checkOrHandOffSegment(): Unit = {
-    val segmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
currentSegmentId)
     val fileType = FileFactory.getFileType(segmentDir)
     if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
       val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
       currentSegmentId = newSegmentId
-      val newSegmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
+      val newSegmentDir = 
CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
       FileFactory.mkdirs(newSegmentDir, fileType)
 
       // TODO trigger hand off operation
@@ -250,15 +248,13 @@ object CarbonAppendableStreamSink {
         }
 
         // update data file info in index file
-        val tablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-        StreamSegment.updateIndexFile(tablePath.getSegmentDir(segmentId))
+        StreamSegment.updateIndexFile(
+          CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId))
 
       } catch {
         // catch fault of executor side
         case t: Throwable =>
-          val tablePath =
-            
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-          val segmentDir = tablePath.getSegmentDir(segmentId)
+          val segmentDir = 
CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
           StreamSegment.recoverSegmentIfRequired(segmentDir)
           LOGGER.error(t, s"Aborting job ${ job.getJobID }.")
           committer.abortJob(job)

Reply via email to