http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index 31a08fc..9afb890 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -26,7 +26,7 @@ import 
org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.CacheClient
 import org.apache.spark.sql.test.util.QueryTest
 
@@ -112,11 +112,9 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest 
with BeforeAndAfterAll
       CarbonCommonConstants.DATABASE_DEFAULT_NAME,
       "ignoremajor"
     )
-    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
 
-    val carbontablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
-      .getMetadataDirectoryPath
-    val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
+    val carbonTablePath = carbonTable.getMetadataPath
+    val segs = SegmentStatusManager.readLoadMetadata(carbonTablePath)
 
     // status should remain as compacted.
     assertResult(SegmentStatus.COMPACTED)(segs(3).getSegmentStatus)
@@ -134,9 +132,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest 
with BeforeAndAfterAll
       CarbonCommonConstants.DATABASE_DEFAULT_NAME,
       "ignoremajor"
     )
-    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-    val carbontablePath = CarbonStorePath
-      .getCarbonTablePath(absoluteTableIdentifier).getMetadataDirectoryPath
+    val carbontablePath = carbonTable.getMetadataPath
     val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
 
     // status should remain as compacted for segment 2.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
index 42ac4df..68a3058 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
   var filePath: String = _
@@ -193,8 +193,7 @@ class TestBatchSortDataLoad extends QueryTest with 
BeforeAndAfterAll {
       CarbonCommonConstants.DATABASE_DEFAULT_NAME,
       tableName
     )
-    val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath(segmentNo)
+    val segmentDir = carbonTable.getSemgentPath(segmentNo)
     new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
index db0a62c..b9d8e12 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
@@ -22,7 +22,7 @@ import java.io.{File, FilenameFilter}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.reader.CarbonIndexFileReader
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -48,8 +48,7 @@ class TestDataLoadWithFileName extends QueryTest with 
BeforeAndAfterAll {
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table test_table_v3")
     val indexReader = new CarbonIndexFileReader()
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
"test_table_v3")
-    val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0")
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
"0")
     val carbonIndexPaths = new File(segmentDir)
       .listFiles(new FilenameFilter {
         override def accept(dir: File, name: String): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 479db50..cbbb191 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -32,8 +32,8 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with 
BeforeAndAfterAll {
   var filePath: String = s"$resourcesPath/globalsort"
@@ -271,8 +271,7 @@ class TestGlobalSortDataLoad extends QueryTest with 
BeforeAndAfterEach with Befo
     }
     sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE carbon_globalsort")
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
"carbon_globalsort")
-    val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val segmentDir = carbonTablePath.getSegmentDir("0")
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
"0")
     assertResult(Math.max(7, defaultParallelism) + 1)(new 
File(segmentDir).listFiles().length)
   }
 
@@ -378,8 +377,7 @@ class TestGlobalSortDataLoad extends QueryTest with 
BeforeAndAfterEach with Befo
 
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): 
Int = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
tableName)
-    val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath(segmentNo)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
segmentNo)
     new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
index ed58253..7c82f75 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
@@ -25,7 +25,7 @@ import org.apache.commons.lang3.time.DateUtils
 import org.apache.spark.sql.Row
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
@@ -79,8 +79,8 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
       "dataRetentionTable"
     )
     absoluteTableIdentifierForRetention = 
carbonTable2.getAbsoluteTableIdentifier
-    carbonTablePath = CarbonStorePath
-      
.getCarbonTablePath(absoluteTableIdentifierForRetention).getMetadataDirectoryPath
+    carbonTablePath = CarbonTablePath
+      .getMetadataPath(absoluteTableIdentifierForRetention.getTablePath)
     carbonTableStatusLock = CarbonLockFactory
       .getCarbonLockObj(absoluteTableIdentifierForLock, 
LockUsage.TABLE_STATUS_LOCK)
     carbonDeleteSegmentLock= CarbonLockFactory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index 0a21aed..e5de8da 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -61,9 +61,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with 
BeforeAndAfterAll
 
   def validateDataFiles(tableUniqueName: String, segmentId: String, 
partitions: Seq[Int]): Unit = {
     val carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-    val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
-      carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
+    val segmentDir = carbonTable.getSemgentPath(segmentId)
     val carbonFile = FileFactory.getCarbonFile(segmentDir, 
FileFactory.getFileType(segmentDir))
     val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
       override def accept(file: CarbonFile): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index c8f7be3..2ce46ef 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -72,8 +72,6 @@ class StandardPartitionTableLoadingTestCase extends QueryTest 
with BeforeAndAfte
 
   def validateDataFiles(tableUniqueName: String, segmentId: String, partition: 
Int): Unit = {
     val carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-    val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
-      carbonTable.getTablePath)
     val partitions = CarbonFilters
       .getPartitions(Seq.empty,
         sqlContext.sparkSession,
@@ -334,9 +332,7 @@ class StandardPartitionTableLoadingTestCase extends 
QueryTest with BeforeAndAfte
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
mergeindexpartitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
     val carbonTable = 
CarbonMetadata.getInstance().getCarbonTable("default_mergeindexpartitionthree")
-    val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
-        carbonTable.getTablePath)
-    val details = 
SegmentStatusManager.readTableStatusFile(tablePath.getTableStatusFilePath)
+    val details = 
SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
     val store = new SegmentFileStore(carbonTable.getTablePath, 
details(0).getSegmentFile)
     store.readIndexFiles()
     store.getIndexFiles

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 5fc7e3d..8adcb11 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -177,7 +177,7 @@ object CarbonStore {
     LOGGER.audit(s"Delete segment by Id request has been received for 
$dbName.$tableName")
     validateLoadIds(loadids)
 
-    val path = carbonTable.getMetaDataFilepath
+    val path = carbonTable.getMetadataPath
 
     try {
       val invalidLoadIds = SegmentStatusManager.updateDeletionStatus(
@@ -203,7 +203,7 @@ object CarbonStore {
     LOGGER.audit(s"Delete segment by Id request has been received for 
$dbName.$tableName")
 
     val time = validateTimeFormat(timestamp)
-    val path = carbonTable.getMetaDataFilepath
+    val path = carbonTable.getMetadataPath
 
     try {
       val invalidLoadTimestamps =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
index 32d121e..3dd9903 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
@@ -30,7 +30,7 @@ trait ColumnValidator {
  */
 trait DictionaryDetailService {
   def getDictionaryDetail(dictFolderPath: String, primDimensions: 
Array[CarbonDimension],
-      table: CarbonTableIdentifier, storePath: String): DictionaryDetail
+      tablePath: String): DictionaryDetail
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
index e861a8c..dbf47ab 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
@@ -23,12 +23,11 @@ import 
org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, 
ColumnIdentifier}
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class DictionaryDetailHelper extends DictionaryDetailService {
-  def getDictionaryDetail(dictfolderPath: String, primDimensions: 
Array[CarbonDimension],
-      table: CarbonTableIdentifier, storePath: String): DictionaryDetail = {
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, table)
+  override def getDictionaryDetail(dictfolderPath: String, primDimensions: 
Array[CarbonDimension],
+      tablePath: String): DictionaryDetail = {
     val dictFilePaths = new Array[String](primDimensions.length)
     val dictFileExists = new Array[Boolean](primDimensions.length)
     val columnIdentifier = new Array[ColumnIdentifier](primDimensions.length)
@@ -50,7 +49,7 @@ class DictionaryDetailHelper extends DictionaryDetailService {
     // 3 lookup fileNamesMap, if file name is in fileNamesMap, file is exists, 
or not.
     primDimensions.zipWithIndex.foreach { f =>
       columnIdentifier(f._2) = f._1.getColumnIdentifier
-      dictFilePaths(f._2) = 
carbonTablePath.getDictionaryFilePath(f._1.getColumnId)
+      dictFilePaths(f._2) = CarbonTablePath.getDictionaryFilePath(tablePath, 
f._1.getColumnId)
       dictFileExists(f._2) =
         
fileNamesMap.get(CarbonTablePath.getDictionaryFileName(f._1.getColumnId)) match 
{
           case None => false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index f2f4ecd..56a66b9 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -26,7 +26,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.statusmanager.SegmentStatus
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.util.GlobalDictionaryUtil
 
 /**
@@ -49,7 +49,7 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: 
ColumnSchema) extends Par
  */
 class AlterTableAddColumnRDD[K, V](sc: SparkContext,
     @transient newColumns: Seq[ColumnSchema],
-    absoluteTableIdentifier: AbsoluteTableIdentifier)
+    identifier: AbsoluteTableIdentifier)
   extends CarbonRDD[(Int, SegmentStatus)](sc, Nil) {
 
   val lockType: String = 
CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE,
@@ -70,8 +70,6 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
         // create dictionary file if it is a dictionary column
         if (columnSchema.hasEncoding(Encoding.DICTIONARY) &&
             !columnSchema.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-          val carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier)
           var rawData: String = null
           if (null != columnSchema.getDefaultValue) {
             rawData = new String(columnSchema.getDefaultValue,
@@ -79,16 +77,15 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
           }
           
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE, 
lockType)
           // Create table and metadata folders if not exist
-          val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+          val metadataDirectoryPath = 
CarbonTablePath.getMetadataPath(identifier.getTablePath)
           val fileType = FileFactory.getFileType(metadataDirectoryPath)
           if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
             FileFactory.mkdirs(metadataDirectoryPath, fileType)
           }
-          GlobalDictionaryUtil
-            .loadDefaultDictionaryValueForNewColumn(carbonTablePath,
-              columnSchema,
-              absoluteTableIdentifier,
-              rawData)
+          GlobalDictionaryUtil.loadDefaultDictionaryValueForNewColumn(
+            columnSchema,
+            identifier,
+            rawData)
         }
       } catch {
         case ex: Exception =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index db29532..7acf4e2 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -40,10 +40,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, 
LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonTableIdentifier, ColumnIdentifier}
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.service.{CarbonCommonFactory, PathService}
 import org.apache.carbondata.core.statusmanager.SegmentStatus
 import org.apache.carbondata.core.util.{CarbonProperties, 
CarbonTimeStatisticsFactory, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -348,10 +346,6 @@ class CarbonGlobalDictionaryGenerateRDD(
         model.table,
         model.columnIdentifier(split.index),
         model.columnIdentifier(split.index).getDataType)
-      val pathService: PathService = CarbonCommonFactory.getPathService
-      val carbonTablePath: CarbonTablePath =
-        pathService
-          .getCarbonTablePath(model.table, dictionaryColumnUniqueIdentifier)
       if (StringUtils.isNotBlank(model.hdfsTempLocation)) {
         
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION,
           model.hdfsTempLocation)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 94668bd..7815c99 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -50,7 +50,7 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.comparator.Comparator
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -635,7 +635,7 @@ object CommonUtil {
 
 
   def readLoadMetadataDetails(model: CarbonLoadModel): Unit = {
-    val metadataPath = 
model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath
+    val metadataPath = 
model.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath
     val details = SegmentStatusManager.readLoadMetadata(metadataPath)
     model.setLoadMetadataDetails(new 
util.ArrayList[LoadMetadataDetails](details.toList.asJava))
   }
@@ -866,20 +866,18 @@ object CommonUtil {
       val fileType = FileFactory.getFileType(databaseLocation)
       if (FileFactory.isFileExist(databaseLocation, fileType)) {
         val file = FileFactory.getCarbonFile(databaseLocation, fileType)
-          if (file.isDirectory) {
-            val tableFolders = file.listFiles()
-            tableFolders.foreach { tableFolder =>
-              if (tableFolder.isDirectory) {
-                val tablePath = databaseLocation +
-                                CarbonCommonConstants.FILE_SEPARATOR + 
tableFolder.getName
-                val identifier =
-                  AbsoluteTableIdentifier.from(tablePath, dbName, 
tableFolder.getName)
-                val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(identifier)
-                val tableStatusFile = carbonTablePath.getTableStatusFilePath
-                if (FileFactory.isFileExist(tableStatusFile, fileType)) {
-                  val segmentStatusManager = new 
SegmentStatusManager(identifier)
-                  val carbonLock = segmentStatusManager.getTableStatusLock
-                  try {
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                              CarbonCommonConstants.FILE_SEPARATOR + 
tableFolder.getName
+              val identifier =
+                AbsoluteTableIdentifier.from(tablePath, dbName, 
tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile, fileType)) {
+                try {
                   val carbonTable = CarbonMetadata.getInstance
                     
.getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName)
                   DataLoadingUtil.deleteLoadsAndUpdateMetadata(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 6767ef7..cee40c8 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -372,7 +372,7 @@ object DataLoadingUtil {
       isForceDeletion: Boolean,
       carbonTable: CarbonTable,
       specs: util.List[PartitionSpec]): Unit = {
-    if (isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) {
+    if (isLoadDeletionRequired(carbonTable.getMetadataPath)) {
       val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
 
       val (details, updationRequired) =
@@ -406,7 +406,7 @@ object DataLoadingUtil {
             }
             // read latest table status again.
             val latestMetadata = SegmentStatusManager
-              .readLoadMetadata(carbonTable.getMetaDataFilepath)
+              .readLoadMetadata(carbonTable.getMetadataPath)
 
             // update the metadata details from old to new status.
             val latestStatus = CarbonLoaderUtil
@@ -433,7 +433,7 @@ object DataLoadingUtil {
         if (updationCompletionStaus) {
           DeleteLoadFolders
             .physicalFactAndMeasureMetadataDeletion(absoluteTableIdentifier,
-              carbonTable.getMetaDataFilepath, isForceDeletion, specs)
+              carbonTable.getMetadataPath, isForceDeletion, specs)
         }
       }
     }
@@ -442,14 +442,14 @@ object DataLoadingUtil {
   private def isUpdationRequired(isForceDeletion: Boolean,
       carbonTable: CarbonTable,
       absoluteTableIdentifier: AbsoluteTableIdentifier) = {
-    val details = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath)
+    val details = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
     // Delete marked loads
     val isUpdationRequired =
       DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
         absoluteTableIdentifier,
         isForceDeletion,
         details,
-        carbonTable.getMetaDataFilepath
+        carbonTable.getMetadataPath
       )
     (details, isUpdationRequired)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 9e1ece7..2bd4f45 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -52,7 +52,7 @@ import 
org.apache.carbondata.core.reader.CarbonDictionaryReader
 import org.apache.carbondata.core.service.CarbonCommonFactory
 import org.apache.carbondata.core.statusmanager.SegmentStatus
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
DataTypeUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonDictionaryWriter
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, 
StringArrayWritable}
@@ -308,7 +308,7 @@ object GlobalDictionaryUtil {
     }
     val primDimensions = primDimensionsBuffer.map { x => x }.toArray
     val dictDetail = CarbonSparkFactory.getDictionaryDetailService.
-      getDictionaryDetail(dictFolderPath, primDimensions, table, 
carbonLoadModel.getTablePath)
+      getDictionaryDetail(dictFolderPath, primDimensions, 
carbonLoadModel.getTablePath)
     val dictFilePaths = dictDetail.dictFilePaths
     val dictFileExists = dictDetail.dictFileExists
     val columnIdentifier = dictDetail.columnIdentifiers
@@ -398,10 +398,6 @@ object GlobalDictionaryUtil {
     }
   }
 
-  // Hack for spark2 integration
-  var updateTableMetadataFunc: (CarbonLoadModel, SQLContext, 
DictionaryLoadModel,
-    Array[CarbonDimension]) => Unit = _
-
   /**
    * check whether global dictionary have been generated successfully or not
    *
@@ -705,10 +701,7 @@ object GlobalDictionaryUtil {
     try {
       val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
       val carbonTableIdentifier = 
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
-      // create dictionary folder if not exists
-      val tablePath = carbonLoadModel.getTablePath
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, 
carbonTableIdentifier)
-      val dictfolderPath = carbonTablePath.getMetadataDirectoryPath
+      val dictfolderPath = 
CarbonTablePath.getMetadataPath(carbonLoadModel.getTablePath)
       // columns which need to generate global dictionary file
       val dimensions = carbonTable.getDimensionByTableName(
         carbonTable.getTableName).asScala.toArray
@@ -845,12 +838,11 @@ object GlobalDictionaryUtil {
    * This method will write dictionary file, sortindex file and dictionary 
meta for new dictionary
    * column with default value
    *
-   * @param carbonTablePath
    * @param columnSchema
    * @param absoluteTableIdentifier
    * @param defaultValue
    */
-  def loadDefaultDictionaryValueForNewColumn(carbonTablePath: CarbonTablePath,
+  def loadDefaultDictionaryValueForNewColumn(
       columnSchema: ColumnSchema,
       absoluteTableIdentifier: AbsoluteTableIdentifier,
       defaultValue: String): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 20d3032..71ce2c6 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -33,12 +33,14 @@ import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.SegmentFileStore.SegmentFile
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, 
DecimalType}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema._
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
RelationIdentifier, TableInfo, TableSchema}
 import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, 
ParentColumnTableRelation}
 import org.apache.carbondata.core.service.CarbonCommonFactory
+import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -208,8 +210,7 @@ class AlterTableColumnSchemaGenerator(
     alterTableModel: AlterTableAddColumnsModel,
     dbName: String,
     tableInfo: TableInfo,
-    carbonTablePath: CarbonTablePath,
-    tableIdentifier: CarbonTableIdentifier,
+    tableIdentifier: AbsoluteTableIdentifier,
     sc: SparkContext) {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -370,7 +371,7 @@ object TableNewProcessor {
       encoders.add(Encoding.DIRECT_DICTIONARY)
     }
     columnSchema.setEncodingList(encoders)
-    val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
+    val colUniqueIdGenerator = ColumnUniqueIdGenerator.getInstance
     val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema)
     columnSchema.setColumnUniqueId(columnUniqueId)
     columnSchema.setColumnReferenceId(columnUniqueId)
@@ -434,7 +435,7 @@ class TableNewProcessor(cm: TableModel) {
       }
     }
     columnSchema.setEncodingList(encoders)
-    val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
+    val colUniqueIdGenerator = ColumnUniqueIdGenerator.getInstance
     val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema)
     columnSchema.setColumnUniqueId(columnUniqueId)
     columnSchema.setColumnReferenceId(columnUniqueId)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 3c871db..1656efa 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -176,8 +176,6 @@ object PartitionUtils {
       getPartitionBlockList(identifier, segmentId, partitionIds, 
oldPartitionIds,
         partitionInfo, 
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable).asScala
     val pathList: util.List[String] = new util.ArrayList[String]()
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, 
"")
-    val carbonTablePath = new CarbonTablePath(carbonTableIdentifier, tablePath)
     tableBlockInfoList.foreach{ tableBlockInfo =>
       val path = tableBlockInfo.getFilePath
       val timestamp = 
CarbonTablePath.DataFileUtil.getTimeStampFromFileName(path)
@@ -190,8 +188,8 @@ object PartitionUtils {
         val batchNo = CarbonTablePath.DataFileUtil.getBatchNoFromTaskNo(taskNo)
         val taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo)
         val bucketNumber = CarbonTablePath.DataFileUtil.getBucketNo(path)
-        val indexFilePath = carbonTablePath.getCarbonIndexFilePath(
-          String.valueOf(taskId), segmentId, batchNo, 
String.valueOf(bucketNumber),
+        val indexFilePath = CarbonTablePath.getCarbonIndexFilePath(
+          tablePath, String.valueOf(taskId), segmentId, batchNo, 
String.valueOf(bucketNumber),
           timestamp, version)
         // indexFilePath could be duplicated when multiple data file related 
to one index file
         if (indexFilePath != null && !pathList.contains(indexFilePath)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
index c8a6b1d..4266c51 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.OperationContext
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, 
CompactionType}
@@ -74,7 +74,7 @@ class AggregateDataMapCompactor(carbonLoadModel: 
CarbonLoadModel,
           "true")
         loadCommand.processData(sqlContext.sparkSession)
         val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata(
-          carbonTable.getMetaDataFilepath, uuid)
+          carbonTable.getMetadataPath, uuid)
         val updatedLoadMetaDataDetails = newLoadMetaDataDetails collect {
           case load if loadMetaDataDetails.contains(load) =>
             load.setMergedLoadName(mergedLoadName)
@@ -83,11 +83,8 @@ class AggregateDataMapCompactor(carbonLoadModel: 
CarbonLoadModel,
             load
           case other => other
         }
-        val carbonTablePath = CarbonStorePath
-          
.getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-            .getAbsoluteTableIdentifier)
-        SegmentStatusManager
-          
.writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePathWithUUID(uuid),
+        SegmentStatusManager.writeLoadDetailsIntoFile(
+          
CarbonTablePath.getTableStatusFilePathWithUUID(carbonTable.getTablePath, uuid),
             updatedLoadMetaDataDetails)
         
carbonLoadModel.setLoadMetadataDetails(updatedLoadMetaDataDetails.toList.asJava)
       } finally {
@@ -108,11 +105,9 @@ class AggregateDataMapCompactor(carbonLoadModel: 
CarbonLoadModel,
         //  4. Therefore tablestatus file will be committed in between 
multiple commits.
         if (!compactionModel.compactionType.equals(CompactionType.MAJOR)) {
           if (!identifySegmentsToBeMerged().isEmpty) {
-            val carbonTablePath = CarbonStorePath
-              
.getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-                .getAbsoluteTableIdentifier)
-            val uuidTableStaus = 
carbonTablePath.getTableStatusFilePathWithUUID(uuid)
-            val tableStatus = carbonTablePath.getTableStatusFilePath
+            val uuidTableStaus = 
CarbonTablePath.getTableStatusFilePathWithUUID(
+              carbonTable.getTablePath, uuid)
+            val tableStatus = 
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
             if (!uuidTableStaus.equalsIgnoreCase(tableStatus)) {
               
FileFactory.getCarbonFile(uuidTableStaus).renameForce(tableStatus)
             }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index add038d..349c436 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -59,7 +59,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.FailureCauses
@@ -72,7 +72,7 @@ import 
org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonData
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
CarbonLoaderUtil}
 import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _}
 import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, 
DataLoadingUtil, Util}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
 
 /**
  * This is the factory class which can create different RDD depends on user 
needs.
@@ -127,7 +127,7 @@ object CarbonDataRDDFactory {
       LOGGER.error("Not able to acquire the compaction lock for table " +
           s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
       CarbonCompactionUtil
-          .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, 
compactionType)
+          .createCompactionRequiredFile(carbonTable.getMetadataPath, 
compactionType)
       // throw exception only in case of DDL trigger.
       if (compactionModel.isDDLTrigger) {
         CarbonException.analysisException(
@@ -195,7 +195,7 @@ object CarbonDataRDDFactory {
                   s"${ tableForCompaction.getDatabaseName }." +
                   s"${ tableForCompaction.getTableName}")
               val table: CarbonTable = tableForCompaction
-              val metadataPath = table.getMetaDataFilepath
+              val metadataPath = table.getMetadataPath
               val compactionType = 
CarbonCompactionUtil.determineCompactionType(metadataPath)
 
               val newCarbonLoadModel = prepareCarbonLoadModel(table)
@@ -580,15 +580,13 @@ object CarbonDataRDDFactory {
         (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): 
_*)))
 
       val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
-        carbonTable.getMetaDataFilepath)
+        carbonTable.getMetadataPath)
         .filter(lmd => 
lmd.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS) ||
                        lmd.getSegmentStatus.equals(SegmentStatus.SUCCESS))
       val segmentIds = loadMetadataDetails.map(_.getLoadName)
       val segmentIdIndex = segmentIds.zipWithIndex.toMap
-      val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonLoadModel.getTablePath,
-        carbonTable.getCarbonTableIdentifier)
       val segmentId2maxTaskNo = segmentIds.map { segId =>
-        (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, 
carbonTablePath))
+        (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, 
carbonLoadModel.getTablePath))
       }.toMap
 
       class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index a4fa37a..1210b92 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -139,7 +139,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
     val validSegments: List[Segment] = 
CarbonDataMergerUtil.getValidSegments(loadsToMerge)
     val carbonMergerMapping = CarbonMergerMapping(
       tablePath,
-      carbonTable.getMetaDataFilepath,
+      carbonTable.getMetadataPath,
       mergedLoadName,
       databaseName,
       factTableName,
@@ -151,7 +151,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       currentPartitions = partitions)
     carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
     carbonLoadModel.setLoadMetadataDetails(
-      
SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
+      
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath).toList.asJava)
     // trigger event for compaction
     val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
       AlterTableCompactionPreEvent(sqlContext.sparkSession,
@@ -237,11 +237,11 @@ class CarbonTableCompactor(carbonLoadModel: 
CarbonLoadModel,
         ((compactionType == CompactionType.IUD_UPDDEL_DELTA) &&
          CarbonDataMergerUtil
            .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
-             carbonTable.getMetaDataFilepath,
+             carbonTable.getMetadataPath,
              carbonLoadModel)) ||
         CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus(
           loadsToMerge,
-            carbonTable.getMetaDataFilepath,
+            carbonTable.getMetadataPath,
             mergedLoadNumber,
           carbonLoadModel,
           compactionType,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 9b9ca0e..acc3358 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -34,12 +34,12 @@ import org.apache.carbondata.common.logging.{LogService, 
LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events._
 import 
org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
@@ -292,9 +292,10 @@ case class CarbonAlterTableCompactionCommand(
           true)(sparkSession,
           sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
         // 5. remove checkpoint
-        val tablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-        FileFactory.deleteAllFilesOfDir(new 
File(tablePath.getStreamingCheckpointDir))
-        FileFactory.deleteAllFilesOfDir(new File(tablePath.getStreamingLogDir))
+        FileFactory.deleteAllFilesOfDir(
+          new 
File(CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)))
+        FileFactory.deleteAllFilesOfDir(
+          new 
File(CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)))
       } else {
         val msg = "Failed to close streaming table, because streaming is 
locked for table " +
                   carbonTable.getDatabaseName() + "." + 
carbonTable.getTableName()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index fc8e975..a42031d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -60,7 +60,9 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.SegmentStatus
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
DataTypeUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
@@ -258,8 +260,7 @@ case class CarbonLoadDataCommand(
           carbonLoadModel.setUseOnePass(false)
         }
         // Create table and metadata folders if not exist
-        val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
-        val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
+        val metadataDirectoryPath = 
CarbonTablePath.getMetadataPath(table.getTablePath)
         val fileType = FileFactory.getFileType(metadataDirectoryPath)
         if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
           FileFactory.mkdirs(metadataDirectoryPath, fileType)
@@ -354,9 +355,7 @@ case class CarbonLoadDataCommand(
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       .getCarbonTableIdentifier
-    val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(carbonLoadModel.getTablePath, carbonTableIdentifier)
-    val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
+    val dictFolderPath = 
CarbonTablePath.getMetadataPath(carbonLoadModel.getTablePath)
     val dimensions = carbonTable.getDimensionByTableName(
       carbonTable.getTableName).asScala.toArray
     val colDictFilePath = carbonLoadModel.getColDictFilePath
@@ -1041,4 +1040,5 @@ case class CarbonLoadDataCommand(
     val dataFrameWithTupleId = dataFrame.get.select(fieldWithTupleId: _*)
     (dataFrameWithTupleId)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index f8f215f..1e5885e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -45,7 +45,7 @@ case class CarbonShowLoadsCommand(
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
     CarbonStore.showSegments(
       limit,
-      carbonTable.getMetaDataFilepath
+      carbonTable.getMetadataPath
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index e3e4c7a..a8316e9 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -36,7 +36,7 @@ import 
org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, 
TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, 
RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent}
 import org.apache.carbondata.hadoop.util.SchemaReader
 
@@ -63,19 +63,18 @@ case class RefreshCarbonTableCommand(
     // 2.2.1 validate that all the aggregate tables are copied at the store 
location.
     // 2.2.2 Register the aggregate tables
     val tablePath = CarbonEnv.getTablePath(databaseNameOp, 
tableName)(sparkSession)
-    val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, 
databaseName, tableName)
+    val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, 
tableName)
     // 2.1 check if the table already register with hive then ignore and 
continue with the next
     // schema
     if (!sparkSession.sessionState.catalog.listTables(databaseName)
       .exists(_.table.equalsIgnoreCase(tableName))) {
-      val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
       // check the existence of the schema file to know its a carbon table
-      val schemaFilePath = carbonTablePath.getSchemaFilePath
+      val schemaFilePath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
       // if schema file does not exist then the table will either non carbon 
table or stale
       // carbon table
       if (FileFactory.isFileExist(schemaFilePath, 
FileFactory.getFileType(schemaFilePath))) {
         // read TableInfo
-        val tableInfo = SchemaReader.getTableInfo(absoluteTableIdentifier)
+        val tableInfo = SchemaReader.getTableInfo(identifier)
         // 2.2 register the table with the hive check if the table being 
registered has
         // aggregate table then do the below steps
         // 2.2.1 validate that all the aggregate tables are copied at the 
store location.
@@ -99,7 +98,7 @@ case class RefreshCarbonTableCommand(
         // Register partitions to hive metastore in case of hive partitioning 
carbon table
         if (tableInfo.getFactTable.getPartitionInfo != null &&
             tableInfo.getFactTable.getPartitionInfo.getPartitionType == 
PartitionType.NATIVE_HIVE) {
-          registerAllPartitionsToHive(absoluteTableIdentifier, sparkSession)
+          registerAllPartitionsToHive(identifier, sparkSession)
         }
       } else {
         LOGGER.audit(
@@ -178,9 +177,7 @@ case class RefreshCarbonTableCommand(
     dataMapSchemaList.asScala.foreach(dataMap => {
       val tableName = dataMap.getChildSchema.getTableName
       val tablePath = CarbonEnv.getTablePath(Some(dbName), 
tableName)(sparkSession)
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath,
-        new CarbonTableIdentifier(dbName, tableName, 
dataMap.getChildSchema.getTableId))
-      val schemaFilePath = carbonTablePath.getSchemaFilePath
+      val schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath)
       try {
         fileExist = FileFactory.isFileExist(schemaFilePath, 
FileFactory.getFileType(schemaFilePath))
       } catch {
@@ -191,7 +188,7 @@ case class RefreshCarbonTableCommand(
         return fileExist;
       }
     })
-    return true
+    true
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 25d5e91..10d55ef 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -41,7 +41,7 @@ import 
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, 
DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
 import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.processing.exception.MultipleMatchingException
@@ -68,12 +68,11 @@ object DeleteExecution {
     val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-    val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
     val isPartitionTable = carbonTable.isHivePartitionTable
     val factPath = if (isPartitionTable) {
-      carbonTablePath.getPath
+      absoluteTableIdentifier.getTablePath
     } else {
-      carbonTablePath.getFactDir
+      CarbonTablePath.getFactDir(absoluteTableIdentifier.getTablePath)
     }
     var segmentsTobeDeleted = Seq.empty[Segment]
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index 38ac58e..c1f86ef 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -39,7 +39,6 @@ import 
org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
 import org.apache.carbondata.spark.partition.DropPartitionCallable
@@ -65,8 +64,8 @@ case class CarbonAlterTableDropPartitionCommand(
     if (relation == null || CarbonMetadata.getInstance.getCarbonTable(dbName, 
tableName) == null) {
       throwMetadataException(dbName, tableName, "table not found")
     }
-    val table = relation.carbonTable
-    val partitionInfo = table.getPartitionInfo(tableName)
+    val carbonTable = relation.carbonTable
+    val partitionInfo = carbonTable.getPartitionInfo(tableName)
     if (partitionInfo == null) {
       throwMetadataException(dbName, tableName, "table is not a partition 
table")
     }
@@ -92,10 +91,9 @@ case class CarbonAlterTableDropPartitionCommand(
           "Dropping range interval partition is unsupported")
     }
     partitionInfo.dropPartition(partitionIndex)
-    val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
-    // read TableInfo
-    val tableInfo = 
carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
 
+    // read TableInfo
+    val tableInfo = 
carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl()
     val wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
       dbName, tableName, tablePath)
@@ -108,11 +106,11 @@ case class CarbonAlterTableDropPartitionCommand(
     
thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
       .setTime_stamp(System.currentTimeMillis)
     carbonMetaStore.updateTableSchemaForAlter(
-      table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-      table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
       thriftTable,
       null,
-      table.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+      carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     // update the schema modified time
     carbonMetaStore.updateAndTouchSchemasUpdatedTime()
     // sparkSession.catalog.refreshTable(tableName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 78cf2b8..efd1216 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -41,7 +41,6 @@ import 
org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
 import org.apache.carbondata.spark.partition.SplitPartitionCallable
@@ -72,8 +71,8 @@ case class CarbonAlterTableSplitPartitionCommand(
       LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
       throwMetadataException(dbName, tableName, "table not found")
     }
-    val table = relation.carbonTable
-    val partitionInfo = table.getPartitionInfo(tableName)
+    val carbonTable = relation.carbonTable
+    val partitionInfo = carbonTable.getPartitionInfo(tableName)
     val partitionIds = 
partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
     // keep a copy of partitionIdList before update partitionInfo.
     // will be used in partition data scan
@@ -88,9 +87,8 @@ case class CarbonAlterTableSplitPartitionCommand(
 
     updatePartitionInfo(partitionInfo, partitionIds)
 
-    val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
     // read TableInfo
-    val tableInfo = 
carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
+    val tableInfo = 
carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl()
     val wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
       dbName, tableName, tablePath)
@@ -100,12 +98,12 @@ case class CarbonAlterTableSplitPartitionCommand(
     wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
     val thriftTable =
       schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, 
tableName)
-    carbonMetaStore
-      
.updateTableSchemaForAlter(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-        table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-        thriftTable,
-        null,
-        table.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+    carbonMetaStore.updateTableSchemaForAlter(
+      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+      thriftTable,
+      null,
+      carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     // update the schema modified time
     carbonMetaStore.updateAndTouchSchemasUpdatedTime()
     Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 59c43aa..4d0a4c5 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -184,7 +184,7 @@ case class CreatePreAggregateTableCommand(
       CarbonFilters.getCurrentPartitions(sparkSession,
       TableIdentifier(parentTable.getTableName,
         Some(parentTable.getDatabaseName))).map(_.asJava).orNull)
-    val loadAvailable = 
SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath)
+    val loadAvailable = 
SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
     if (loadAvailable.exists(load => load.getSegmentStatus == 
SegmentStatus.INSERT_IN_PROGRESS ||
       load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
       throw new UnsupportedOperationException(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 657e0c5..11f451b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -290,13 +290,12 @@ object CommitPreAggregateListener extends 
OperationEventListener {
       // keep committing until one fails
       val renamedDataMaps = childLoadCommands.takeWhile { childLoadCommand =>
         val childCarbonTable = childLoadCommand.table
-        val carbonTablePath =
-          new CarbonTablePath(childCarbonTable.getCarbonTableIdentifier,
-            childCarbonTable.getTablePath)
         // Generate table status file name with UUID, forExample: tablestatus_1
-        val oldTableSchemaPath = 
carbonTablePath.getTableStatusFilePathWithUUID(uuid)
+        val oldTableSchemaPath = 
CarbonTablePath.getTableStatusFilePathWithUUID(
+          childCarbonTable.getTablePath, uuid)
         // Generate table status file name without UUID, forExample: 
tablestatus
-        val newTableSchemaPath = carbonTablePath.getTableStatusFilePath
+        val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
+          childCarbonTable.getTablePath)
         renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, 
uuid)
       }
       // if true then the commit for one of the child tables has failed
@@ -306,11 +305,10 @@ object CommitPreAggregateListener extends 
OperationEventListener {
         renamedDataMaps.foreach {
           loadCommand =>
             val carbonTable = loadCommand.table
-            val carbonTablePath = new 
CarbonTablePath(carbonTable.getCarbonTableIdentifier,
-              carbonTable.getTablePath)
             // rename the backup tablestatus i.e tablestatus_backup_UUID to 
tablestatus
-            val backupTableSchemaPath = carbonTablePath.getTableStatusFilePath 
+ "_backup_" + uuid
-            val tableSchemaPath = carbonTablePath.getTableStatusFilePath
+            val backupTableSchemaPath =
+              CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) 
+ "_backup_" + uuid
+            val tableSchemaPath = 
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
             markInProgressSegmentAsDeleted(backupTableSchemaPath, 
operationContext, loadCommand)
             renameDataMapTableStatusFiles(backupTableSchemaPath, 
tableSchemaPath, "")
         }
@@ -377,9 +375,8 @@ object CommitPreAggregateListener extends 
OperationEventListener {
       operationContext: OperationContext,
       uuid: String): Unit = {
     childTables.foreach { childTable =>
-      val carbonTablePath = new 
CarbonTablePath(childTable.getCarbonTableIdentifier,
-        childTable.getTablePath)
-      val metaDataDir = 
FileFactory.getCarbonFile(carbonTablePath.getMetadataDirectoryPath)
+      val metaDataDir = FileFactory.getCarbonFile(
+        CarbonTablePath.getMetadataPath(childTable.getTablePath))
       val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
         override def accept(file: CarbonFile): Boolean = {
           file.getName.contains(uuid) || file.getName.contains("backup")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 1bd12cd..ebf87d2 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -16,11 +16,12 @@
  */
 package org.apache.spark.sql.execution.command.preaaggregate
 
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
-import org.apache.spark.sql._
-import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => 
SubqueryAlias, MatchCastExpression}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, 
CarbonSession, SparkSession, _}
+import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => 
SubqueryAlias}
+import org.apache.spark.sql.CarbonExpressions.MatchCastExpression
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, 
UnresolvedFunction, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
AttributeSeq, Cast, Expression, ExprId, NamedExpression, ScalaUDF}
@@ -35,16 +36,12 @@ import org.apache.spark.sql.types.DataType
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import 
org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, 
CarbonTable, DataMapSchema, TableSchema}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.format.TableInfo
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -427,9 +424,7 @@ object PreAggregateUtil {
       locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
       // get the latest carbon table and check for column existence
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(
-        carbonTable.getAbsoluteTableIdentifier)
-      val thriftTableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      val thriftTableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTable)(sparkSession)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
         thriftTableInfo,
@@ -518,6 +513,26 @@ object PreAggregateUtil {
     }
   }
 
+  /**
+   * This method reverts the changes to the schema if add column command fails.
+   *
+   * @param dbName
+   * @param tableName
+   * @param numberOfChildSchema
+   * @param sparkSession
+   */
+  def revertMainTableChanges(dbName: String, tableName: String, 
numberOfChildSchema: Int)
+    (sparkSession: SparkSession): Unit = {
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(sparkSession)
+    carbonTable.getTableLastUpdatedTime
+    val thriftTable: TableInfo = 
metastore.getThriftTableInfo(carbonTable)(sparkSession)
+    if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
+      metastore.revertTableSchemaForPreAggCreationFailure(
+        carbonTable.getAbsoluteTableIdentifier, thriftTable)(sparkSession)
+    }
+  }
+
   def getChildCarbonTable(databaseName: String, tableName: String)
     (sparkSession: SparkSession): Option[CarbonTable] = {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 4b43ea7..064ba18 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -28,7 +28,6 @@ import org.apache.carbondata.common.logging.{LogService, 
LogServiceFactory}
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{AlterTableAddColumnPostEvent, 
AlterTableAddColumnPreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.TableInfo
 import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, 
AlterTableDropColumnRDD}
@@ -64,9 +63,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
       
OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener, 
operationContext)
       // get the latest carbon table and check for column existence
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-      val thriftTableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      val thriftTableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTable)(sparkSession)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter
         .fromExternalToWrapperTableInfo(thriftTableInfo,
@@ -76,8 +73,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
       newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel,
         dbName,
         wrapperTableInfo,
-        carbonTablePath,
-        carbonTable.getCarbonTableIdentifier,
+        carbonTable.getAbsoluteTableIdentifier,
         sparkSession.sparkContext).process
       // generate dictionary files for the newly added columns
       new AlterTableAddColumnRDD(sparkSession.sparkContext,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index 571e23f..8b0d75f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -27,7 +27,6 @@ import org.apache.spark.util.AlterTableUtil
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{AlterTableDataTypeChangePostEvent, 
AlterTableDataTypeChangePreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, 
TableInfo}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, 
DataTypeConverterUtil}
@@ -74,9 +73,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
         throwMetadataException(dbName, tableName, s"Invalid Column: 
$columnName")
       }
       // read the latest schema file
-      val carbonTablePath =
-        
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-      val tableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      val tableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTable)(sparkSession)
       // maintain the added column for schema evolution history
       var addColumnSchema: ColumnSchema = null
       var deletedColumnSchema: ColumnSchema = null
@@ -84,8 +81,9 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
       columnSchemaList.foreach { columnSchema =>
         if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
           deletedColumnSchema = columnSchema.deepCopy
-          columnSchema.setData_type(DataTypeConverterUtil
-            
.convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
+          columnSchema.setData_type(
+            DataTypeConverterUtil.convertToThriftDataType(
+              alterTableDataTypeChangeModel.dataTypeInfo.dataType))
           
columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
           
columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
           addColumnSchema = columnSchema
@@ -97,8 +95,8 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
       schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
       
tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
         .setTime_stamp(System.currentTimeMillis)
-      AlterTableUtil
-        .updateSchemaInfo(carbonTable, schemaEvolutionEntry, 
tableInfo)(sparkSession,
+      AlterTableUtil.updateSchemaInfo(
+        carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession,
           sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
       val alterTablePostExecutionEvent: AlterTableDataTypeChangePostEvent =
         new AlterTableDataTypeChangePostEvent(sparkSession, carbonTable,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 780ac8f..7bbefd7 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -29,7 +29,7 @@ import org.apache.carbondata.common.logging.{LogService, 
LogServiceFactory}
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{AlterTableDropColumnPostEvent, 
AlterTableDropColumnPreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.SchemaEvolutionEntry
 import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
@@ -99,10 +99,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
       
OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPreEvent, 
operationContext)
 
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+        metastore.getThriftTableInfo(carbonTable)(sparkSession)
       // maintain the deleted columns for schema evolution history
       var deletedColumnSchema = 
ListBuffer[org.apache.carbondata.format.ColumnSchema]()
       val columnSchemaList = tableInfo.fact_table.table_columns.asScala

Reply via email to