http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 9799ac2..b7b5e43 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -22,16 +22,14 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 
 import org.apache.hadoop.fs.Path;
 
-
 /**
  * Helps to get Table content paths.
  */
-public class CarbonTablePath extends Path {
+public class CarbonTablePath {
 
   private static final String METADATA_DIR = "Metadata";
   private static final String DICTIONARY_EXT = ".dict";
@@ -54,19 +52,10 @@ public class CarbonTablePath extends Path {
   private static final String STREAMING_LOG_DIR = "log";
   private static final String STREAMING_CHECKPOINT_DIR = "checkpoint";
 
-  private String tablePath;
-  private CarbonTableIdentifier carbonTableIdentifier;
-
   /**
-   * structure CarbonTablePath object to manage table paths
-   *
-   * @param carbonTableIdentifier identifier of carbon table that the segment 
belong to
-   * @param tablePathString the store path of the segment
+   * This class provides static utility only.
    */
-  public CarbonTablePath(CarbonTableIdentifier carbonTableIdentifier, String 
tablePathString) {
-    super(tablePathString);
-    this.carbonTableIdentifier = carbonTableIdentifier;
-    this.tablePath = tablePathString;
+  private CarbonTablePath() {
   }
 
   /**
@@ -130,40 +119,21 @@ public class CarbonTablePath extends Path {
   }
 
   /**
-   * gets table path
+   * Return absolute path of dictionary file
    */
-  public String getPath() {
-    return tablePath;
+  public static String getDictionaryFilePath(String tablePath, String 
columnId) {
+    return getMetadataPath(tablePath) + File.separator + 
getDictionaryFileName(columnId);
   }
 
   /**
-   * @param columnId unique column identifier
-   * @return absolute path of dictionary file
+   * Return absolute path of dictionary file
    */
-  public String getDictionaryFilePath(String columnId) {
-    return getMetaDataDir() + File.separator + getDictionaryFileName(columnId);
-  }
-
-  /**
-   * @param dictionaryPath
-   * @param columnId unique column identifier
-   * @return absolute path of dictionary file
-   */
-  public String getDictionaryFilePath(String dictionaryPath, String columnId) {
+  public static String getExternalDictionaryFilePath(String dictionaryPath, 
String columnId) {
     return dictionaryPath + File.separator + getDictionaryFileName(columnId);
   }
 
   /**
-   * This method will return the metadata directory location for a table
-   *
-   * @return
-   */
-  public String getMetadataDirectoryPath() {
-    return getMetaDataDir();
-  }
-
-  /**
-   * Return metadata path based on `tablePath`
+   * Return metadata path
    */
   public static String getMetadataPath(String tablePath) {
     return tablePath + File.separator + METADATA_DIR;
@@ -184,67 +154,42 @@ public class CarbonTablePath extends Path {
   }
 
   /**
-   * @param columnId unique column identifier
-   * @return absolute path of dictionary meta file
+   * Return absolute path of dictionary meta file
    */
-  public String getDictionaryMetaFilePath(String columnId) {
-    return getMetaDataDir() + File.separator + columnId + DICTIONARY_META_EXT;
-  }
-
-  /**
-   * @param dictionaryPath
-   * @param columnId unique column identifier
-   * @return absolute path of dictionary file
-   */
-  public String getDictionaryMetaFilePath(String dictionaryPath, String 
columnId) {
+  public static String getExternalDictionaryMetaFilePath(String 
dictionaryPath, String columnId) {
     return dictionaryPath + File.separator + columnId + DICTIONARY_META_EXT;
   }
 
   /**
-   * @param columnId unique column identifier
-   * @return absolute path of sort index file
+   * Return absolute path of dictionary meta file
    */
-  public String getSortIndexFilePath(String columnId) {
-    return getMetaDataDir() + File.separator + columnId + SORT_INDEX_EXT;
+  public static String getDictionaryMetaFilePath(String tablePath, String 
columnId) {
+    return getMetadataPath(tablePath) + File.separator + columnId + 
DICTIONARY_META_EXT;
   }
 
   /**
-   * @param dictionaryPath
-   * @param columnId unique column identifier
-   * @return absolute path of dictionary file
+   * Return absolute path of sort index file
    */
-  public String getSortIndexFilePath(String dictionaryPath, String columnId) {
-    return dictionaryPath + File.separator + columnId + SORT_INDEX_EXT;
+  public static String getSortIndexFilePath(String tablePath, String columnId) 
{
+    return getMetadataPath(tablePath) + File.separator + columnId + 
SORT_INDEX_EXT;
   }
 
   /**
-   *
-   * @param columnId
-   * @param dictOffset
-   * @return absolute path of sortindex with appeneded dictionary offset
+   * Return sortindex file path based on specified dictionary path
    */
-  public String getSortIndexFilePath(String columnId, long dictOffset) {
-    return getMetaDataDir() + File.separator + columnId + "_" + dictOffset + 
SORT_INDEX_EXT;
+  public static String getExternalSortIndexFilePath(String dictionaryPath, 
String columnId) {
+    return dictionaryPath + File.separator + columnId + SORT_INDEX_EXT;
   }
 
   /**
-   * @param dictionaryPath
-   * @param columnId unique column identifier
-   * @param dictOffset
-   * @return absolute path of dictionary file
+   * Return sortindex file path for columnId and offset based on specified 
dictionary path
    */
-  public String getSortIndexFilePath(String dictionaryPath, String columnId, 
long dictOffset) {
+  public static String getExternalSortIndexFilePath(String dictionaryPath, 
String columnId,
+      long dictOffset) {
     return dictionaryPath + File.separator + columnId + "_" + dictOffset + 
SORT_INDEX_EXT;
   }
 
   /**
-   * @return absolute path of schema file
-   */
-  public String getSchemaFilePath() {
-    return getActualSchemaFilePath(tablePath);
-  }
-
-  /**
    * return the schema file path
    * @param tablePath path to table files
    * @return schema file path
@@ -268,18 +213,11 @@ public class CarbonTablePath extends Path {
     }
   }
 
-  /**
-   * @return absolute path of table status file
-   */
-  public String getTableStatusFilePath() {
-    return getMetaDataDir() + File.separator + TABLE_STATUS_FILE;
-  }
-
-  public String getTableStatusFilePathWithUUID(String uuid) {
+  public static String getTableStatusFilePathWithUUID(String tablePath, String 
uuid) {
     if (!uuid.isEmpty()) {
-      return getTableStatusFilePath() + CarbonCommonConstants.UNDERSCORE + 
uuid;
+      return getTableStatusFilePath(tablePath) + 
CarbonCommonConstants.UNDERSCORE + uuid;
     } else {
-      return getTableStatusFilePath();
+      return getTableStatusFilePath(tablePath);
     }
   }
 
@@ -291,9 +229,9 @@ public class CarbonTablePath extends Path {
    * @param factUpdateTimeStamp unique identifier to identify an update
    * @return absolute path of data file stored in carbon data format
    */
-  public String getCarbonDataFilePath(String segmentId, Integer filePartNo, 
Long taskNo,
-      int batchNo, int bucketNumber, String factUpdateTimeStamp) {
-    return getSegmentDir(segmentId) + File.separator + getCarbonDataFileName(
+  public static String getCarbonDataFilePath(String tablePath, String 
segmentId, Integer filePartNo,
+      Long taskNo, int batchNo, int bucketNumber, String factUpdateTimeStamp) {
+    return getSegmentPath(tablePath, segmentId) + File.separator + 
getCarbonDataFileName(
         filePartNo, taskNo, bucketNumber, batchNo, factUpdateTimeStamp);
   }
 
@@ -305,9 +243,9 @@ public class CarbonTablePath extends Path {
    * @param segmentId   segment number
    * @return full qualified carbon index path
    */
-  public String getCarbonIndexFilePath(final String taskId, final String 
segmentId,
-      final String bucketNumber) {
-    String segmentDir = getSegmentDir(segmentId);
+  private static String getCarbonIndexFilePath(final String tablePath, final 
String taskId,
+      final String segmentId, final String bucketNumber) {
+    String segmentDir = getSegmentPath(tablePath, segmentId);
     CarbonFile carbonFile =
         FileFactory.getCarbonFile(segmentDir, 
FileFactory.getFileType(segmentDir));
 
@@ -340,27 +278,28 @@ public class CarbonTablePath extends Path {
    *        timestamp
    * @return carbon index file path
    */
-  public String getCarbonIndexFilePath(String taskId, String segmentId, String 
bucketNumber,
-      String timeStamp, ColumnarFormatVersion columnarFormatVersion) {
+  public static String getCarbonIndexFilePath(String tablePath, String taskId, 
String segmentId,
+      String bucketNumber, String timeStamp, ColumnarFormatVersion 
columnarFormatVersion) {
     switch (columnarFormatVersion) {
       case V1:
       case V2:
-        return getCarbonIndexFilePath(taskId, segmentId, bucketNumber);
+        return getCarbonIndexFilePath(tablePath, taskId, segmentId, 
bucketNumber);
       default:
-        String segmentDir = getSegmentDir(segmentId);
+        String segmentDir = getSegmentPath(tablePath, segmentId);
         return segmentDir + File.separator + getCarbonIndexFileName(taskId,
             Integer.parseInt(bucketNumber), timeStamp);
     }
   }
 
-  public String getCarbonIndexFilePath(String taskId, String segmentId, int 
batchNo,
-      String bucketNumber, String timeStamp, ColumnarFormatVersion 
columnarFormatVersion) {
+  public static String getCarbonIndexFilePath(String tablePath, String taskId, 
String segmentId,
+      int batchNo, String bucketNumber, String timeStamp,
+      ColumnarFormatVersion columnarFormatVersion) {
     switch (columnarFormatVersion) {
       case V1:
       case V2:
-        return getCarbonIndexFilePath(taskId, segmentId, bucketNumber);
+        return getCarbonIndexFilePath(tablePath, taskId, segmentId, 
bucketNumber);
       default:
-        String segmentDir = getSegmentDir(segmentId);
+        String segmentDir = getSegmentPath(tablePath, segmentId);
         return segmentDir + File.separator + 
getCarbonIndexFileName(Long.parseLong(taskId),
             Integer.parseInt(bucketNumber), batchNo, timeStamp);
     }
@@ -375,13 +314,10 @@ public class CarbonTablePath extends Path {
   }
 
   /**
-   * Gets absolute path of data file
-   *
-   * @param segmentId   unique partition identifier
-   * @return absolute path of data file stored in carbon data format
+   * Return the segment path from table path and segmentid
    */
-  public String getCarbonDataDirectoryPath(String segmentId) {
-    return getSegmentDir(segmentId);
+  public static String getSegmentPath(String tablePath, String segmentId) {
+    return getPartitionDir(tablePath) + File.separator + SEGMENT_PREFIX + 
segmentId;
   }
 
   /**
@@ -419,48 +355,46 @@ public class CarbonTablePath extends Path {
     return segmentDir + File.separator + getCarbonStreamIndexFileName();
   }
 
-  public String getSegmentDir(String segmentId) {
-    return getPartitionDir() + File.separator + SEGMENT_PREFIX + segmentId;
-  }
-
   // This partition is not used in any code logic, just keep backward 
compatibility
   public static final String DEPRECATED_PATITION_ID = "0";
 
-  public String getPartitionDir() {
-    return getFactDir() + File.separator + PARTITION_PREFIX +
-        CarbonTablePath.DEPRECATED_PATITION_ID;
+  /**
+   * Return true if tablePath exists
+   */
+  public static boolean exists(String tablePath) {
+    return FileFactory.getCarbonFile(tablePath, 
FileFactory.getFileType(tablePath)).exists();
   }
 
-  private String getMetaDataDir() {
-    return tablePath + File.separator + METADATA_DIR;
+  public static String getPartitionDir(String tablePath) {
+    return getFactDir(tablePath) + File.separator + PARTITION_PREFIX +
+        CarbonTablePath.DEPRECATED_PATITION_ID;
   }
 
-  public String getFactDir() {
+  public static String getFactDir(String tablePath) {
     return tablePath + File.separator + FACT_DIR;
   }
 
-  public String getStreamingLogDir() {
+  public static String getStreamingLogDir(String tablePath) {
     return tablePath + File.separator + STREAMING_DIR + File.separator + 
STREAMING_LOG_DIR;
   }
 
-  public String getStreamingCheckpointDir() {
+  public static String getStreamingCheckpointDir(String tablePath) {
     return tablePath + File.separator + STREAMING_DIR + File.separator + 
STREAMING_CHECKPOINT_DIR;
   }
 
-  public CarbonTableIdentifier getCarbonTableIdentifier() {
-    return carbonTableIdentifier;
-  }
-
-  @Override public boolean equals(Object o) {
-    if (!(o instanceof CarbonTablePath)) {
-      return false;
-    }
-    CarbonTablePath path = (CarbonTablePath) o;
-    return tablePath.equals(path.tablePath) && super.equals(o);
-  }
-
-  @Override public int hashCode() {
-    return super.hashCode() + tablePath.hashCode();
+  /**
+   * get the parent folder of old table path and returns the new tablePath by 
appending new
+   * tableName to the parent
+   *
+   * @param tablePath         Old tablePath
+   * @param newTableName      new table name
+   * @return the new table path
+   */
+  public static String getNewTablePath(
+      String tablePath,
+      String newTableName) {
+    Path parentPath = new Path(tablePath).getParent();
+    return parentPath.toString() + CarbonCommonConstants.FILE_SEPARATOR + 
newTableName;
   }
 
   /**
@@ -479,11 +413,8 @@ public class CarbonTablePath extends Path {
       return fileName.substring(startIndex, endIndex);
     }
 
-
     /**
-     * This will return the timestamp present in the delete delta file.
-     * @param fileName
-     * @return
+     * Return the timestamp present in the delete delta file.
      */
     public static String getTimeStampFromDeleteDeltaFile(String fileName) {
       return 
fileName.substring(fileName.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
@@ -491,9 +422,7 @@ public class CarbonTablePath extends Path {
     }
 
     /**
-     * This will return the timestamp present in the delete delta file.
-     * @param fileName
-     * @return
+     * Return the timestamp present in the delete delta file.
      */
     public static String getBlockNameFromDeleteDeltaFile(String fileName) {
       return fileName.substring(0,
@@ -501,7 +430,7 @@ public class CarbonTablePath extends Path {
     }
 
     /**
-     * gets updated timestamp information from given carbon data file name
+     * Return the updated timestamp information from given carbon data file 
name
      */
     public static String getBucketNo(String carbonFilePath) {
       // Get the file name from path
@@ -519,7 +448,7 @@ public class CarbonTablePath extends Path {
     }
 
     /**
-     * gets file part number information from given carbon data file name
+     * Return the file part number information from given carbon data file name
      */
     public static String getPartNo(String carbonDataFileName) {
       // Get the file name from path
@@ -531,7 +460,7 @@ public class CarbonTablePath extends Path {
     }
 
     /**
-     * gets updated timestamp information from given carbon data file name
+     * Return the updated timestamp information from given carbon data file 
name
      */
     public static String getTaskNo(String carbonDataFileName) {
       // Get the file name from path
@@ -544,35 +473,30 @@ public class CarbonTablePath extends Path {
     }
 
     /**
-     * get the taskId part from taskNo(include taskId + batchNo)
-     * @param taskNo
-     * @return
+     * Return the taskId part from taskNo(include taskId + batchNo)
      */
     public static long getTaskIdFromTaskNo(String taskNo) {
       return Long.parseLong(taskNo.split(BATCH_PREFIX)[0]);
     }
 
+    /**
+     * Return the batch number from taskNo string
+     */
     public static int getBatchNoFromTaskNo(String taskNo) {
       return Integer.parseInt(taskNo.split(BATCH_PREFIX)[1]);
     }
 
     /**
-     * Gets the file name from file path
+     * Return the file name from file path
      */
-    private static String getFileName(String carbonDataFileName) {
-      int endIndex = 
carbonDataFileName.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR);
+    private static String getFileName(String dataFilePath) {
+      int endIndex = 
dataFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR);
       if (endIndex > -1) {
-        return carbonDataFileName.substring(endIndex + 1, 
carbonDataFileName.length());
+        return dataFilePath.substring(endIndex + 1, dataFilePath.length());
       } else {
-        return carbonDataFileName;
+        return dataFilePath;
       }
     }
-  }
-
-  /**
-   * To manage data path and composition
-   */
-  public static class DataPathUtil {
 
     /**
      * gets segement id from given absolute data file path
@@ -580,11 +504,11 @@ public class CarbonTablePath extends Path {
     public static String getSegmentId(String dataFileAbsolutePath) {
       // find segment id from last of data file path
       String tempdataFileAbsolutePath = dataFileAbsolutePath.replace(
-              CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, 
CarbonCommonConstants.FILE_SEPARATOR);
+          CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, 
CarbonCommonConstants.FILE_SEPARATOR);
       int endIndex = 
tempdataFileAbsolutePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR);
       // + 1 for size of "/"
       int startIndex = tempdataFileAbsolutePath.lastIndexOf(
-              CarbonCommonConstants.FILE_SEPARATOR, endIndex - 1) + 1;
+          CarbonCommonConstants.FILE_SEPARATOR, endIndex - 1) + 1;
       String segmentDirStr = dataFileAbsolutePath.substring(startIndex, 
endIndex);
       //identify id in segment_<id>
       String[] segmentDirSplits = segmentDirStr.split("_");
@@ -616,19 +540,16 @@ public class CarbonTablePath extends Path {
   }
 
   /**
-   * returns the carbondata file name
-   *
-   * @param carbonDataFilePath carbondata file path
-   * @return
+   * Return the carbondata file name
    */
   public static String getCarbonDataFileName(String carbonDataFilePath) {
-    return carbonDataFilePath
-        
.substring(carbonDataFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) 
+ 1,
-            carbonDataFilePath.indexOf(CARBON_DATA_EXT));
+    return carbonDataFilePath.substring(
+        carbonDataFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 
1,
+        carbonDataFilePath.indexOf(CARBON_DATA_EXT));
   }
 
   /**
-   * @return prefix of carbon data
+   * Return prefix of carbon data
    */
   public static String getCarbonDataPrefix() {
     return DATA_PART_PREFIX;
@@ -684,40 +605,6 @@ public class CarbonTablePath extends Path {
   }
 
   /**
-   * This method will append strings in path and return block id
-   *
-   * @param shortBlockId
-   * @return blockId
-   */
-  public static String getBlockId(String shortBlockId) {
-    String[] splitRecords = 
shortBlockId.split(CarbonCommonConstants.FILE_SEPARATOR);
-    StringBuffer sb = new StringBuffer();
-    for (int i = 0; i < splitRecords.length; i++) {
-      if (i == 0) {
-        sb.append(PARTITION_PREFIX);
-        sb.append(splitRecords[i]);
-      } else if (i == 1) {
-        sb.append(CarbonCommonConstants.FILE_SEPARATOR);
-        sb.append(SEGMENT_PREFIX);
-        sb.append(splitRecords[i]);
-      } else if (i == 2) {
-        sb.append(CarbonCommonConstants.FILE_SEPARATOR);
-        sb.append(DATA_PART_PREFIX);
-        sb.append(splitRecords[i]);
-      } else if (i == 3) {
-        sb.append(CarbonCommonConstants.FILE_SEPARATOR);
-        sb.append(splitRecords[i]);
-        sb.append(CARBON_DATA_EXT);
-      } else {
-        sb.append(CarbonCommonConstants.FILE_SEPARATOR);
-        sb.append(splitRecords[i]);
-      }
-    }
-    return sb.toString();
-  }
-
-
-  /**
    * adds data part prefix to given value
    * @return partition prefix
    */
@@ -747,13 +634,6 @@ public class CarbonTablePath extends Path {
   }
 
   /**
-   * Get the segment path from table path and segmentid
-   */
-  public static String getSegmentPath(String tablePath, String segmentId) {
-    return tablePath + "/Fact/Part0/Segment_" + segmentId;
-  }
-
-  /**
    * Get the segment file locations of table
    */
   public static String getSegmentFilesLocation(String tablePath) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/core/src/test/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCacheTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCacheTest.java
 
b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCacheTest.java
index 31e44a2..7d829b9 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCacheTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCacheTest.java
@@ -34,7 +34,6 @@ import 
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -48,7 +47,7 @@ public class AbstractDictionaryCacheTest {
 
   protected CarbonTableIdentifier carbonTableIdentifier;
 
-  protected AbsoluteTableIdentifier absoluteTableIdentifier;
+  protected AbsoluteTableIdentifier identifier;
 
   protected String databaseName;
 
@@ -107,7 +106,7 @@ public class AbstractDictionaryCacheTest {
   protected DictionaryColumnUniqueIdentifier 
createDictionaryColumnUniqueIdentifier(
       String columnId) {
        ColumnIdentifier columnIdentifier = new ColumnIdentifier(columnId, 
null, DataTypes.STRING);
-    return new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, 
columnIdentifier,
+    return new DictionaryColumnUniqueIdentifier(identifier, columnIdentifier,
         DataTypes.STRING);
   }
 
@@ -130,13 +129,11 @@ public class AbstractDictionaryCacheTest {
       throws IOException {
        ColumnIdentifier columnIdentifier = new ColumnIdentifier(columnId, 
null, null);
     DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
-        new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, 
columnIdentifier,
+        new DictionaryColumnUniqueIdentifier(identifier, columnIdentifier,
             columnIdentifier.getDataType());
     CarbonDictionaryWriter carbonDictionaryWriter =
         new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier);
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-    
CarbonUtil.checkAndCreateFolder(carbonTablePath.getMetadataDirectoryPath());
+    
CarbonUtil.checkAndCreateFolder(CarbonTablePath.getMetadataPath(identifier.getTablePath()));
     List<byte[]> valueList = convertStringListToByteArray(data);
     try {
       carbonDictionaryWriter.write(valueList);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java
 
b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java
index d0aedd4..c36c89d 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java
@@ -56,7 +56,7 @@ public class ForwardDictionaryCacheTest extends 
AbstractDictionaryCacheTest {
     this.carbonStorePath = props.getProperty("storePath", "carbonStore");
     carbonTableIdentifier =
         new CarbonTableIdentifier(databaseName, tableName, 
UUID.randomUUID().toString());
-    absoluteTableIdentifier =
+    identifier =
         AbsoluteTableIdentifier.from(carbonStorePath + "/" + databaseName + 
"/" + tableName,
             carbonTableIdentifier);
     columnIdentifiers = new String[] { "name", "place" };
@@ -67,7 +67,7 @@ public class ForwardDictionaryCacheTest extends 
AbstractDictionaryCacheTest {
 
   @After public void tearDown() throws Exception {
     carbonTableIdentifier = null;
-    absoluteTableIdentifier = null;
+    identifier = null;
     forwardDictionaryCache = null;
     deleteStorePath();
   }
@@ -217,7 +217,7 @@ public class ForwardDictionaryCacheTest extends 
AbstractDictionaryCacheTest {
   private void writeSortIndexFile(List<String> data, String columnId) throws 
IOException {
        ColumnIdentifier columnIdentifier = new ColumnIdentifier(columnId, 
null, null);
     DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
-        new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, 
columnIdentifier,
+        new DictionaryColumnUniqueIdentifier(identifier, columnIdentifier,
             columnIdentifier.getDataType());
     Map<String, Integer> dataToSurrogateKeyMap = new HashMap<>(data.size());
     int surrogateKey = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java
 
b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java
index 01cb3a9..d2bf2e3 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java
@@ -58,7 +58,7 @@ public class ReverseDictionaryCacheTest extends 
AbstractDictionaryCacheTest {
     this.carbonStorePath = props.getProperty("storePath", "carbonStore");
     carbonTableIdentifier =
         new CarbonTableIdentifier(databaseName, tableName, 
UUID.randomUUID().toString());
-    absoluteTableIdentifier = AbsoluteTableIdentifier.from(
+    identifier = AbsoluteTableIdentifier.from(
         carbonStorePath + "/" + databaseName + "/" + tableName, 
carbonTableIdentifier);
     columnIdentifiers = new String[] { "name", "place" };
     deleteStorePath();
@@ -69,7 +69,7 @@ public class ReverseDictionaryCacheTest extends 
AbstractDictionaryCacheTest {
   @After public void tearDown() throws Exception {
     carbonTableIdentifier = null;
     reverseDictionaryCache = null;
-    absoluteTableIdentifier = null;
+    identifier = null;
     deleteStorePath();
   }
 
@@ -276,6 +276,6 @@ public class ReverseDictionaryCacheTest extends 
AbstractDictionaryCacheTest {
   protected DictionaryColumnUniqueIdentifier 
createDictionaryColumnUniqueIdentifier(
              String columnId) {
            ColumnIdentifier columnIdentifier = new ColumnIdentifier(columnId, 
null, DataTypes.DOUBLE);
-    return new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, 
columnIdentifier);
+    return new DictionaryColumnUniqueIdentifier(identifier, columnIdentifier);
          }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/core/src/test/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImplTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImplTest.java
 
b/core/src/test/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImplTest.java
index d3c3bc3..ecabfd4 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImplTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImplTest.java
@@ -28,8 +28,6 @@ import 
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.service.CarbonCommonFactory;
-import org.apache.carbondata.core.service.PathService;
-import org.apache.carbondata.core.service.impl.PathFactory;
 
 import mockit.Mock;
 import mockit.MockUp;
@@ -58,12 +56,6 @@ public class CarbonDictionaryReaderImplTest {
   }
 
   @Test public void testRead() throws Exception {
-    new MockUp<CarbonCommonFactory>() {
-      @Mock public PathService getPathService() {
-
-        return new PathFactory();
-      }
-    };
     new MockUp<CarbonDictionaryMetadataReaderImpl>() {
       @Mock public List<CarbonDictionaryColumnMetaChunk> read() throws 
IOException {
         CarbonDictionaryColumnMetaChunk carbonDictionaryColumnMetaChunks =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
 
b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
index a1ccab3..4293536 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
@@ -39,21 +39,19 @@ public class CarbonFormatDirectoryStructureTest {
    */
   @Test public void testTablePathStructure() throws IOException {
     CarbonTableIdentifier tableIdentifier = new CarbonTableIdentifier("d1", 
"t1", UUID.randomUUID().toString());
-    CarbonStorePath carbonStorePath = new CarbonStorePath(CARBON_STORE);
-    AbsoluteTableIdentifier absoluteTableIdentifier =
+    AbsoluteTableIdentifier identifier =
         AbsoluteTableIdentifier.from(CARBON_STORE + "/d1/t1", tableIdentifier);
-    CarbonTablePath carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-    assertTrue(carbonTablePath.getPath().replace("\\", 
"/").equals(CARBON_STORE + "/d1/t1"));
-    assertTrue(carbonTablePath.getSchemaFilePath().replace("\\", 
"/").equals(CARBON_STORE + "/d1/t1/Metadata/schema"));
-    assertTrue(carbonTablePath.getTableStatusFilePath().replace("\\", "/")
+    assertTrue(identifier.getTablePath().replace("\\", 
"/").equals(CARBON_STORE + "/d1/t1"));
+    
assertTrue(CarbonTablePath.getSchemaFilePath(identifier.getTablePath()).replace("\\",
 "/").equals(CARBON_STORE + "/d1/t1/Metadata/schema"));
+    
assertTrue(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()).replace("\\",
 "/")
         .equals(CARBON_STORE + "/d1/t1/Metadata/tablestatus"));
-    assertTrue(carbonTablePath.getDictionaryFilePath("t1_c1").replace("\\", 
"/")
+    
assertTrue(CarbonTablePath.getDictionaryFilePath(identifier.getTablePath(), 
"t1_c1").replace("\\", "/")
         .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dict"));
-    
assertTrue(carbonTablePath.getDictionaryMetaFilePath("t1_c1").replace("\\", "/")
+    
assertTrue(CarbonTablePath.getDictionaryMetaFilePath(identifier.getTablePath(), 
"t1_c1").replace("\\", "/")
         .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dictmeta"));
-    assertTrue(carbonTablePath.getSortIndexFilePath("t1_c1").replace("\\", "/")
+    
assertTrue(CarbonTablePath.getSortIndexFilePath(identifier.getTablePath(),"t1_c1").replace("\\",
 "/")
         .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.sortindex"));
-    assertTrue(carbonTablePath.getCarbonDataFilePath("2", 3, 4L,  0, 0, 
"999").replace("\\", "/")
+    
assertTrue(CarbonTablePath.getCarbonDataFilePath(identifier.getTablePath(), 
"2", 3, 4L,  0, 0, "999").replace("\\", "/")
         .equals(CARBON_STORE + 
"/d1/t1/Fact/Part0/Segment_2/part-3-4_batchno0-0-999.carbondata"));
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatSharedDictionaryTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatSharedDictionaryTest.java
 
b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatSharedDictionaryTest.java
deleted file mode 100644
index 91384c1..0000000
--- 
a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatSharedDictionaryTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.util.path;
-
-import java.io.IOException;
-
-import org.junit.Test;
-
-import static junit.framework.TestCase.assertTrue;
-
-/**
- * test shared dictionary paths
- */
-public class CarbonFormatSharedDictionaryTest {
-
-  private final String CARBON_STORE = "/opt/carbonstore";
-
-  /**
-   * test shared dictionary location
-   */
-  @Test public void testSharedDimentionLocation() throws IOException {
-    assertTrue(CarbonSharedDictionaryPath.getDictionaryFilePath(CARBON_STORE, 
"d1", "shared_c1").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/SharedDictionary/shared_c1.dict"));
-    
assertTrue(CarbonSharedDictionaryPath.getDictionaryMetaFilePath(CARBON_STORE, 
"d1", "shared_c1").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/SharedDictionary/shared_c1.dictmeta"));
-    assertTrue(CarbonSharedDictionaryPath.getSortIndexFilePath(CARBON_STORE, 
"d1", "shared_c1").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/SharedDictionary/shared_c1.sortindex"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
 
b/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
index 308d041..d5500e1 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
@@ -44,7 +44,6 @@ import 
org.apache.carbondata.core.reader.CarbonDictionaryMetadataReaderImpl;
 import org.apache.carbondata.core.reader.CarbonDictionaryReaderImpl;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.ColumnDictionaryChunkMeta;
 
@@ -72,7 +71,7 @@ public class CarbonDictionaryWriterImplTest {
 
   private String tableName;
 
-  private String carbonStorePath;
+  private String tablePath;
 
   private ColumnIdentifier columnIdentifier;
 
@@ -100,10 +99,10 @@ public class CarbonDictionaryWriterImplTest {
     init();
     this.databaseName = props.getProperty("database", "testSchema");
     this.tableName = props.getProperty("tableName", "carbon");
-    this.carbonStorePath = props.getProperty("storePath", "carbonStore");
+    this.tablePath = props.getProperty("storePath", "carbonStore");
     this.columnIdentifier = new ColumnIdentifier("Name", null, null);
     carbonTableIdentifier = new CarbonTableIdentifier(databaseName, tableName, 
UUID.randomUUID().toString());
-    absoluteTableIdentifier = AbsoluteTableIdentifier.from(carbonStorePath, 
carbonTableIdentifier);
+    absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, 
carbonTableIdentifier);
     this.dictionaryColumnUniqueIdentifier =
         new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, 
columnIdentifier,
             columnIdentifier.getDataType());
@@ -479,8 +478,8 @@ public class CarbonDictionaryWriterImplTest {
    * this method will delete the store path
    */
   private void deleteStorePath() {
-    FileFactory.FileType fileType = 
FileFactory.getFileType(this.carbonStorePath);
-    CarbonFile carbonFile = FileFactory.getCarbonFile(this.carbonStorePath, 
fileType);
+    FileFactory.FileType fileType = FileFactory.getFileType(this.tablePath);
+    CarbonFile carbonFile = FileFactory.getCarbonFile(this.tablePath, 
fileType);
     deleteRecursiveSilent(carbonFile);
   }
 
@@ -528,14 +527,12 @@ public class CarbonDictionaryWriterImplTest {
    * this method will form the dictionary directory paths
    */
   private void initDictionaryDirPaths() throws IOException {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(this.carbonStorePath, 
carbonTableIdentifier);
-    String dictionaryLocation = carbonTablePath.getMetadataDirectoryPath();
+    String dictionaryLocation = CarbonTablePath.getMetadataPath(tablePath);
     FileFactory.FileType fileType = 
FileFactory.getFileType(dictionaryLocation);
     if(!FileFactory.isFileExist(dictionaryLocation, fileType)) {
       FileFactory.mkdirs(dictionaryLocation, fileType);
     }
-    this.dictionaryFilePath = 
carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId());
-    this.dictionaryMetaFilePath = 
carbonTablePath.getDictionaryMetaFilePath(columnIdentifier.getColumnId());
+    this.dictionaryFilePath = CarbonTablePath.getDictionaryFilePath(tablePath, 
columnIdentifier.getColumnId());
+    this.dictionaryMetaFilePath = 
CarbonTablePath.getDictionaryMetaFilePath(tablePath, 
columnIdentifier.getColumnId());
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
index ef4dbce..bcbf190 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
@@ -24,10 +24,6 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession}
 import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
-
 /**
  * This example introduces how to use CarbonData batch load to integrate
  * with Spark Streaming(it's DStream, not Spark Structured Streaming)
@@ -74,7 +70,6 @@ object CarbonBatchSparkStreamingExample {
            | """.stripMargin)
 
       val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
streamTableName)(spark)
-      val tablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       // batch load
       val path = 
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
       spark.sql(
@@ -88,7 +83,7 @@ object CarbonBatchSparkStreamingExample {
       val serverSocket = new ServerSocket(7071)
       val thread1 = writeSocket(serverSocket)
       val thread2 = showTableCount(spark, streamTableName)
-      val ssc = startStreaming(spark, streamTableName, tablePath, 
checkpointPath)
+      val ssc = startStreaming(spark, streamTableName, checkpointPath)
       // wait for stop signal to stop Spark Streaming App
       waitForStopSignal(ssc)
       // it need to start Spark Streaming App in main thread
@@ -153,7 +148,7 @@ object CarbonBatchSparkStreamingExample {
   }
 
   def startStreaming(spark: SparkSession, tableName: String,
-      tablePath: CarbonTablePath, checkpointPath: String): StreamingContext = {
+      checkpointPath: String): StreamingContext = {
     var ssc: StreamingContext = null
     try {
       // recommend: the batch interval must set larger, such as 30s, 1min.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
index f59a610..63b1c5a 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
@@ -24,14 +24,11 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.CarbonEnv
 import org.apache.spark.sql.CarbonSparkStreamingFactory
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.CarbonSparkStreamingListener
 import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
@@ -77,7 +74,6 @@ object CarbonStreamSparkStreamingExample {
            | 'dictionary_include'='city')
            | """.stripMargin)
       val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
streamTableName)(spark)
-      val tablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       // batch load
       val path = 
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
       spark.sql(
@@ -91,7 +87,7 @@ object CarbonStreamSparkStreamingExample {
       val serverSocket = new ServerSocket(7071)
       val thread1 = writeSocket(serverSocket)
       val thread2 = showTableCount(spark, streamTableName)
-      val ssc = startStreaming(spark, streamTableName, tablePath, 
checkpointPath)
+      val ssc = startStreaming(spark, streamTableName, checkpointPath)
       // add a Spark Streaming Listener to remove all lock for stream tables 
when stop app
       ssc.sparkContext.addSparkListener(new CarbonSparkStreamingListener())
       // wait for stop signal to stop Spark Streaming App
@@ -156,7 +152,7 @@ object CarbonStreamSparkStreamingExample {
   }
 
   def startStreaming(spark: SparkSession, tableName: String,
-      tablePath: CarbonTablePath, checkpointPath: String): StreamingContext = {
+      checkpointPath: String): StreamingContext = {
     var ssc: StreamingContext = null
     try {
       // recommend: the batch interval must set larger, such as 30s, 1min.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
index 8ce4afc..bc65b2f 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
@@ -23,7 +23,8 @@ import java.net.ServerSocket
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
 
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 // scalastyle:off println
 object CarbonStructuredStreamingExample {
@@ -73,7 +74,6 @@ object CarbonStructuredStreamingExample {
       }
 
       val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
streamTableName)(spark)
-      val tablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       // batch load
       val path = 
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
       spark.sql(
@@ -85,7 +85,7 @@ object CarbonStructuredStreamingExample {
 
       // streaming ingest
       val serverSocket = new ServerSocket(7071)
-      val thread1 = startStreaming(spark, tablePath)
+      val thread1 = startStreaming(spark, carbonTable)
       val thread2 = writeSocket(serverSocket)
       val thread3 = showTableCount(spark, streamTableName)
 
@@ -136,7 +136,7 @@ object CarbonStructuredStreamingExample {
     thread
   }
 
-  def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread 
= {
+  def startStreaming(spark: SparkSession, carbonTable: CarbonTable): Thread = {
     val thread = new Thread() {
       override def run(): Unit = {
         var qry: StreamingQuery = null
@@ -151,7 +151,8 @@ object CarbonStructuredStreamingExample {
           qry = readSocketDF.writeStream
             .format("carbondata")
             .trigger(ProcessingTime("5 seconds"))
-            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("checkpointLocation",
+              
CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
             .option("dbName", "default")
             .option("tableName", "stream_table")
             .start()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
index cce833b..9ca0e07 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
@@ -23,7 +23,7 @@ import java.net.ServerSocket
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
 
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
 case class FileElement(school: Array[String], age: Int)
@@ -77,7 +77,6 @@ object CarbonStructuredStreamingWithRowParser {
       }
 
       val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
streamTableName)(spark)
-      val tablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       // batch load
       val path = 
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
       spark.sql(
@@ -89,7 +88,7 @@ object CarbonStructuredStreamingWithRowParser {
 
       // streaming ingest
       val serverSocket = new ServerSocket(7071)
-      val thread1 = startStreaming(spark, tablePath)
+      val thread1 = startStreaming(spark, carbonTable.getTablePath)
       val thread2 = writeSocket(serverSocket)
       val thread3 = showTableCount(spark, streamTableName)
 
@@ -140,7 +139,7 @@ object CarbonStructuredStreamingWithRowParser {
     thread
   }
 
-  def startStreaming(spark: SparkSession, tablePath: CarbonTablePath): Thread 
= {
+  def startStreaming(spark: SparkSession, tablePath: String): Thread = {
     val thread = new Thread() {
       override def run(): Unit = {
         var qry: StreamingQuery = null
@@ -167,7 +166,7 @@ object CarbonStructuredStreamingWithRowParser {
           qry = readSocketDF.writeStream
             .format("carbondata")
             .trigger(ProcessingTime("5 seconds"))
-            .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+            .option("checkpointLocation", 
CarbonTablePath.getStreamingCheckpointDir(tablePath))
             .option("dbName", "default")
             .option("tableName", "stream_table_with_row_parser")
             .option(CarbonStreamParser.CARBON_STREAM_PARSER,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 5a20d7e..9b86e4f 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -71,7 +71,6 @@ import 
org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeConverter;
 import org.apache.carbondata.core.util.DataTypeConverterImpl;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
@@ -250,10 +249,6 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     }
   }
 
-  private static CarbonTablePath getTablePath(AbsoluteTableIdentifier 
absIdentifier) {
-    return CarbonStorePath.getCarbonTablePath(absIdentifier);
-  }
-
   /**
    * Set list of segments to access
    */
@@ -499,11 +494,10 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     List<InputSplit> splits = new ArrayList<InputSplit>();
     if (streamSegments != null && !streamSegments.isEmpty()) {
 
-      CarbonTablePath tablePath = 
CarbonStorePath.getCarbonTablePath(identifier);
       long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
       long maxSize = getMaxSplitSize(job);
       for (Segment segment : streamSegments) {
-        String segmentDir = tablePath.getSegmentDir(segment.getSegmentNo());
+        String segmentDir = 
CarbonTablePath.getSegmentPath(identifier.getTablePath(), 
segment.getSegmentNo());
         FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
         if (FileFactory.isFileExist(segmentDir, fileType)) {
           String indexName = CarbonTablePath.getCarbonStreamIndexFileName();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
index 3ef8afc..a4b3be8 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
@@ -37,7 +37,6 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.FileHeader;
 import org.apache.carbondata.processing.loading.BadRecordsLogger;
@@ -127,9 +126,8 @@ public class CarbonStreamRecordWriter extends 
RecordWriter<Void, Object> {
     maxCacheSize = 
hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE,
         CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE_DEFAULT);
 
-    CarbonTablePath tablePath =
-        
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
-    segmentDir = tablePath.getSegmentDir(segmentId);
+    segmentDir = CarbonTablePath.getSegmentPath(
+        carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId);
     fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
index 423bb2a..dfa8dd1 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
@@ -27,7 +27,7 @@ import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
@@ -37,8 +37,7 @@ public class SchemaReader {
 
   public static CarbonTable readCarbonTableFromStore(AbsoluteTableIdentifier 
identifier)
       throws IOException {
-    CarbonTablePath carbonTablePath = 
CarbonStorePath.getCarbonTablePath(identifier);
-    String schemaFilePath = carbonTablePath.getSchemaFilePath();
+    String schemaFilePath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
     if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
         FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
         FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.S3) ||
@@ -46,7 +45,7 @@ public class SchemaReader {
       String tableName = identifier.getCarbonTableIdentifier().getTableName();
 
       org.apache.carbondata.format.TableInfo tableInfo =
-          CarbonUtil.readSchemaFile(carbonTablePath.getSchemaFilePath());
+          
CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(identifier.getTablePath()));
       SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
       TableInfo wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(
           tableInfo,
@@ -63,22 +62,21 @@ public class SchemaReader {
   /**
    * the method returns the Wrapper TableInfo
    *
-   * @param absoluteTableIdentifier
+   * @param identifier
    * @return
    */
-  public static TableInfo getTableInfo(AbsoluteTableIdentifier 
absoluteTableIdentifier)
+  public static TableInfo getTableInfo(AbsoluteTableIdentifier identifier)
       throws IOException {
-    CarbonTablePath carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
     org.apache.carbondata.format.TableInfo thriftTableInfo =
-        CarbonUtil.readSchemaFile(carbonTablePath.getSchemaFilePath());
+        
CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(identifier.getTablePath()));
     ThriftWrapperSchemaConverterImpl thriftWrapperSchemaConverter =
         new ThriftWrapperSchemaConverterImpl();
     CarbonTableIdentifier carbonTableIdentifier =
-        absoluteTableIdentifier.getCarbonTableIdentifier();
+        identifier.getCarbonTableIdentifier();
     return thriftWrapperSchemaConverter.fromExternalToWrapperTableInfo(
         thriftTableInfo,
         carbonTableIdentifier.getDatabaseName(),
         carbonTableIdentifier.getTableName(),
-        absoluteTableIdentifier.getTablePath());
+        identifier.getTablePath());
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index d7d04b2..8eb9a77 100644
--- 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -65,7 +65,6 @@ import 
org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
 import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl;
@@ -188,11 +187,11 @@ public class StoreCreator {
   }
 
   public static CarbonTable createTable(
-      AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
+      AbsoluteTableIdentifier identifier) throws IOException {
     TableInfo tableInfo = new TableInfo();
-    
tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
+    
tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
     TableSchema tableSchema = new TableSchema();
-    
tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+    
tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
     List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
     ArrayList<Encoding> encodings = new ArrayList<>();
     encodings.add(Encoding.DICTIONARY);
@@ -284,13 +283,12 @@ public class StoreCreator {
     tableSchema.setSchemaEvalution(schemaEvol);
     tableSchema.setTableId(UUID.randomUUID().toString());
     tableInfo.setTableUniqueName(
-        absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName()
+        identifier.getCarbonTableIdentifier().getTableUniqueName()
     );
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setFactTable(tableSchema);
-    tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
-    CarbonTablePath carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-    String schemaFilePath = carbonTablePath.getSchemaFilePath();
+    tableInfo.setTablePath(identifier.getTablePath());
+    String schemaFilePath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
     String schemaMetadataPath = 
CarbonTablePath.getFolderContainingFile(schemaFilePath);
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
 
@@ -469,7 +467,7 @@ public class StoreCreator {
     
loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
     listOfLoadFolderDetails.add(loadMetadataDetails);
 
-    String dataLoadLocation = schema.getCarbonTable().getMetaDataFilepath() + 
File.separator
+    String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + 
File.separator
         + CarbonTablePath.TABLE_STATUS_FILE;
 
     DataOutputStream dataOutputStream;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
 
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index 5a2f831..8f7e88c 100755
--- 
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ 
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -25,7 +25,6 @@ import 
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.service.impl.PathFactory;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.CarbonProjection;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
@@ -82,7 +81,8 @@ public class CarbondataRecordSetProvider implements 
ConnectorRecordSetProvider {
         carbonTableReader.getCarbonCache(carbondataSplit.getSchemaTableName());
     checkNotNull(tableCacheModel, "tableCacheModel should not be null");
     checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable 
should not be null");
-    checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should 
not be null");
+    checkNotNull(
+        tableCacheModel.carbonTable.getTableInfo(), "tableCacheModel.tableInfo 
should not be null");
 
     // Build Query Model
     CarbonTable targetTable = tableCacheModel.carbonTable;
@@ -92,8 +92,7 @@ public class CarbondataRecordSetProvider implements 
ConnectorRecordSetProvider {
     try {
       Configuration conf = new Configuration();
       conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
-      String carbonTablePath = PathFactory.getInstance()
-          .getCarbonTablePath(targetTable.getAbsoluteTableIdentifier(), 
null).getPath();
+      String carbonTablePath = 
targetTable.getAbsoluteTableIdentifier().getTablePath();
 
       conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
       JobConf jobConf = new JobConf(conf);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
 
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
index 8422c3e..4984406 100755
--- 
a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
+++ 
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
@@ -17,27 +17,18 @@
 
 package org.apache.carbondata.presto.impl;
 
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
- * Caching metadata of CarbonData(e.g. TableIdentifier, TablePath, TableInfo, 
CarbonTable) in Class CarbonTableReader
+ * Caching metadata of CarbonData in Class CarbonTableReader
  * to speed up query
  */
 public class CarbonTableCacheModel {
 
-  public CarbonTableIdentifier carbonTableIdentifier;
-  public CarbonTablePath carbonTablePath;
-
-  public TableInfo tableInfo;
   public CarbonTable carbonTable;
 
   public boolean isValid() {
-    if (carbonTable != null && carbonTablePath != null && 
carbonTableIdentifier != null)
-      return true;
-    else return false;
+    return carbonTable != null;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
 
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index b0271ef..09389f8 100755
--- 
a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ 
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -42,7 +42,6 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.reader.ThriftReader;
 import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.service.impl.PathFactory;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
@@ -243,9 +242,10 @@ public class CarbonTableReader {
       updateSchemaList();
     }
     try {
-      if (isKeyExists && !FileFactory
-          
.isFileExist(carbonCache.get().get(schemaTableName).carbonTablePath.getSchemaFilePath(),
-              fileType)) {
+      if (isKeyExists
+          && !FileFactory.isFileExist(
+              CarbonTablePath.getSchemaFilePath(
+                  
carbonCache.get().get(schemaTableName).carbonTable.getTablePath()), fileType)) {
         removeTableFromCache(schemaTableName);
         throw new TableNotFoundException(schemaTableName);
       }
@@ -255,10 +255,12 @@ public class CarbonTableReader {
 
     if (isKeyExists) {
       CarbonTableCacheModel ctcm = carbonCache.get().get(schemaTableName);
-      if(ctcm != null && ctcm.tableInfo != null) {
-        Long latestTime = 
FileFactory.getCarbonFile(ctcm.carbonTablePath.getSchemaFilePath())
-            .getLastModifiedTime();
-        Long oldTime = ctcm.tableInfo.getLastUpdatedTime();
+      if(ctcm != null && ctcm.carbonTable.getTableInfo() != null) {
+        Long latestTime = FileFactory.getCarbonFile(
+            CarbonTablePath.getSchemaFilePath(
+                
carbonCache.get().get(schemaTableName).carbonTable.getTablePath())
+        ).getLastModifiedTime();
+        Long oldTime = ctcm.carbonTable.getTableInfo().getLastUpdatedTime();
         if (DateUtils.truncate(new Date(latestTime), Calendar.MINUTE)
             .after(DateUtils.truncate(new Date(oldTime), Calendar.MINUTE))) {
           removeTableFromCache(schemaTableName);
@@ -312,21 +314,12 @@ public class CarbonTableReader {
 
       // Step 1: get store path of the table and cache it.
       // create table identifier. the table id is randomly generated.
-      cache.carbonTableIdentifier =
+      CarbonTableIdentifier carbonTableIdentifier =
               new CarbonTableIdentifier(table.getSchemaName(), 
table.getTableName(),
                       UUID.randomUUID().toString());
       String storePath = config.getStorePath();
-      String tablePath = storePath + "/" + 
cache.carbonTableIdentifier.getDatabaseName() + "/"
-          + cache.carbonTableIdentifier.getTableName();
-
-      // get the store path of the table.
-
-      AbsoluteTableIdentifier absoluteTableIdentifier =
-          AbsoluteTableIdentifier.from(tablePath, cache.carbonTableIdentifier);
-      cache.carbonTablePath =
-          
PathFactory.getInstance().getCarbonTablePath(absoluteTableIdentifier, null);
-      // cache the table
-      carbonCache.get().put(table, cache);
+      String tablePath = storePath + "/" + 
carbonTableIdentifier.getDatabaseName() + "/"
+          + carbonTableIdentifier.getTableName();
 
       //Step 2: read the metadata (tableInfo) of the table.
       ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
@@ -338,7 +331,7 @@ public class CarbonTableReader {
         }
       };
       ThriftReader thriftReader =
-              new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), 
createTBase);
+              new ThriftReader(CarbonTablePath.getSchemaFilePath(tablePath), 
createTBase);
       thriftReader.open();
       org.apache.carbondata.format.TableInfo tableInfo =
               (org.apache.carbondata.format.TableInfo) thriftReader.read();
@@ -355,9 +348,12 @@ public class CarbonTableReader {
       // Step 4: Load metadata info into CarbonMetadata
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
 
-      cache.tableInfo = wrapperTableInfo;
-      cache.carbonTable = CarbonMetadata.getInstance()
-              
.getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
+      cache.carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+          table.getSchemaName(), table.getTableName());
+
+      // cache the table
+      carbonCache.get().put(table, cache);
+
       result = cache.carbonTable;
     } catch (Exception ex) {
       throw new RuntimeException(ex);
@@ -372,11 +368,10 @@ public class CarbonTableReader {
     List<CarbonLocalInputSplit> result = new ArrayList<>();
 
     CarbonTable carbonTable = tableCacheModel.carbonTable;
-    TableInfo tableInfo = tableCacheModel.tableInfo;
+    TableInfo tableInfo = tableCacheModel.carbonTable.getTableInfo();
     Configuration config = new Configuration();
     config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
-    String carbonTablePath = PathFactory.getInstance()
-        .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier(), 
null).getPath();
+    String carbonTablePath = 
carbonTable.getAbsoluteTableIdentifier().getTablePath();
     config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
     config.set(CarbonTableInputFormat.DATABASE_NAME, 
carbonTable.getDatabaseName());
     config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
 
b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index aed3b09..e768660 100644
--- 
a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ 
b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -52,7 +52,7 @@ import 
org.apache.carbondata.core.metadata.schema.{SchemaEvolution, SchemaEvolut
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonMetadata,
 CarbonTableIdentifier, ColumnIdentifier}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import 
org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter,
 CarbonDictionarySortIndexWriterImpl, CarbonDictionarySortInfo, 
CarbonDictionarySortInfoPreparator}
@@ -323,10 +323,8 @@ object CarbonDataStoreCreator {
     )
     tableInfo.setLastUpdatedTime(System.currentTimeMillis())
     tableInfo.setFactTable(tableSchema)
-    val carbonTablePath: CarbonTablePath = CarbonStorePath.getCarbonTablePath(
-      absoluteTableIdentifier.getTablePath,
-      absoluteTableIdentifier.getCarbonTableIdentifier)
-    val schemaFilePath: String = carbonTablePath.getSchemaFilePath
+    val schemaFilePath: String = CarbonTablePath.getSchemaFilePath(
+      absoluteTableIdentifier.getTablePath)
     val schemaMetadataPath: String =
       CarbonTablePath.getFolderContainingFile(schemaFilePath)
     CarbonMetadata.getInstance.loadTableMetadata(tableInfo)
@@ -533,7 +531,7 @@ object CarbonDataStoreCreator {
       loadMetadataDetails.setLoadStartTime(
         loadMetadataDetails.getTimeStamp(readCurrentTime()))
       listOfLoadFolderDetails.add(loadMetadataDetails)
-      val dataLoadLocation: String = schema.getCarbonTable.getMetaDataFilepath 
+ File.separator +
+      val dataLoadLocation: String = schema.getCarbonTable.getMetadataPath + 
File.separator +
                                      CarbonTablePath.TABLE_STATUS_FILE
       val gsonObjectToWrite: Gson = new Gson()
       val writeOperation: AtomicFileOperations = new AtomicFileOperationsImpl(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index 2d5b64b..c0abe4e 100644
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -18,18 +18,14 @@
 
 package org.apache.carbondata.cluster.sdv.generated
 
-import org.apache.spark.sql.CarbonEnv
 import org.apache.spark.sql.common.util._
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonMetadata}
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 /**
  * Test Class for AlterTableTestCase to verify all scenerios
@@ -56,7 +52,6 @@ class MergeIndexTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE 
carbon_automation_merge 
OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
 
rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
 
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_merge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("0", 
table.getTablePath, false)
     assert(getIndexFileCount("default", "carbon_automation_merge", "0") == 0)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""),
@@ -74,7 +69,6 @@ class MergeIndexTestCase extends QueryTest with 
BeforeAndAfterAll {
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 
2)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 
2)
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("0", 
table.getTablePath, false)
     new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("1", 
table.getTablePath, false)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 
0)
@@ -96,7 +90,6 @@ class MergeIndexTestCase extends QueryTest with 
BeforeAndAfterAll {
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 
2)
     sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'minor'").collect()
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("0.1", 
table.getTablePath, false)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1") 
== 0)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), 
rows)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 82dec39..e5075ef 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.sql.Row
 import org.scalatest.BeforeAndAfterEach
 
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.spark.sql.test.util.QueryTest
@@ -48,13 +48,13 @@ class TestLoadDataGeneral extends QueryTest with 
BeforeAndAfterEach {
       datbaseName: String,
       tableName: String): Boolean = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(datbaseName, 
tableName)
-    val partitionPath = CarbonStorePath
-      
.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir
+    val partitionPath =
+      
CarbonTablePath.getPartitionDir(carbonTable.getAbsoluteTableIdentifier.getTablePath)
     val fileType: FileFactory.FileType = FileFactory.getFileType(partitionPath)
     val carbonFile = FileFactory.getCarbonFile(partitionPath, fileType)
     val segments: ArrayBuffer[String] = ArrayBuffer()
     carbonFile.listFiles.foreach { file =>
-      segments += 
CarbonTablePath.DataPathUtil.getSegmentId(file.getAbsolutePath + "/dummy")
+      segments += 
CarbonTablePath.DataFileUtil.getSegmentId(file.getAbsolutePath + "/dummy")
     }
     segments.contains(segmentId)
   }
@@ -236,8 +236,7 @@ class TestLoadDataGeneral extends QueryTest with 
BeforeAndAfterEach {
     sql("create table stale(a string) stored by 'carbondata'")
     sql("insert into stale values('k')")
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
"stale")
-    val tableStatusFile = new CarbonTablePath(null,
-      carbonTable.getTablePath).getTableStatusFilePath
+    val tableStatusFile = 
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
     FileFactory.getCarbonFile(tableStatusFile).delete()
     sql("insert into stale values('k')")
     checkAnswer(sql("select * from stale"), Row("k"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 5cc4156..3c2fd71 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -29,7 +29,7 @@ import 
org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   var timeStampPropOrig: String = _
@@ -231,8 +231,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("insert overwrite table HiveOverwrite select * from THive")
     checkAnswer(sql("select count(*) from CarbonOverwrite"), sql("select 
count(*) from HiveOverwrite"))
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
"carbonoverwrite")
-    val partitionPath = CarbonStorePath
-      
.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir
+    val partitionPath = 
CarbonTablePath.getPartitionDir(carbonTable.getAbsoluteTableIdentifier.getTablePath)
     val folder = new File(partitionPath)
     assert(folder.isDirectory)
     assert(folder.list().length == 1)
@@ -254,8 +253,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' overwrite INTO 
TABLE HiveOverwrite")
     checkAnswer(sql("select count(*) from TCarbonSourceOverwrite"), 
sql("select count(*) from HiveOverwrite"))
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
"tcarbonsourceoverwrite")
-    val partitionPath = CarbonStorePath
-      
.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir
+    val partitionPath = 
CarbonTablePath.getPartitionDir(carbonTable.getAbsoluteTableIdentifier.getTablePath)
     val folder = new File(partitionPath)
 
     assert(folder.isDirectory)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
index 09268b5..8315848 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableAsSelect.scala
@@ -142,7 +142,7 @@ class TestCreateTableAsSelect extends QueryTest with 
BeforeAndAfterAll {
     val carbonTable = 
CarbonEnv.getInstance(Spark2TestQueryExecutor.spark).carbonMetastore
       .lookupRelation(Option("default"), 
"ctas_tblproperties_test")(Spark2TestQueryExecutor.spark)
       .asInstanceOf[CarbonRelation].carbonTable
-    val metadataFolderPath: CarbonFile = 
FileFactory.getCarbonFile(carbonTable.getMetaDataFilepath)
+    val metadataFolderPath: CarbonFile = 
FileFactory.getCarbonFile(carbonTable.getMetadataPath)
     assert(metadataFolderPath.exists())
     val dictFiles: Array[CarbonFile] = metadataFolderPath.listFiles(new 
CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
index 2ef88a4..a7607c3 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
@@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.test.util.QueryTest
 
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
@@ -43,9 +43,7 @@ class DataCompactionLockTest extends QueryTest with 
BeforeAndAfterAll {
         new CarbonTableIdentifier(
           CarbonCommonConstants.DATABASE_DEFAULT_NAME, 
"compactionlocktesttable", "1")
       )
-  val carbonTablePath: CarbonTablePath = CarbonStorePath
-    .getCarbonTablePath(absoluteTableIdentifier)
-  val dataPath: String = carbonTablePath.getMetadataDirectoryPath
+  val dataPath: String = 
CarbonTablePath.getMetadataPath(absoluteTableIdentifier.getTablePath)
 
   val carbonLock: ICarbonLock =
     CarbonLockFactory

Reply via email to