[CARBONDATA-1739] Clean up store path interface

This closes #1509


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5fc7f06f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5fc7f06f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5fc7f06f

Branch: refs/heads/master
Commit: 5fc7f06f23e944719b2735b97176d68fe209ad75
Parents: b6777fc
Author: Jacky Li <jacky.li...@qq.com>
Authored: Thu Nov 16 19:41:19 2017 +0800
Committer: QiangCai <qiang...@qq.com>
Committed: Fri Nov 17 14:46:19 2017 +0800

----------------------------------------------------------------------
 .../dictionary/ManageDictionaryAndBTree.java    |   2 +-
 .../core/metadata/CarbonMetadata.java           |   2 +-
 .../core/metadata/schema/table/CarbonTable.java |   4 +-
 .../core/mutate/CarbonUpdateUtil.java           |   8 +-
 .../carbondata/core/scan/model/QueryModel.java  |   4 +-
 .../carbondata/core/util/CarbonProperties.java  |   7 +
 .../core/metadata/CarbonMetadataTest.java       |   9 +-
 .../metadata/schema/table/CarbonTableTest.java  |   3 +-
 .../table/CarbonTableWithComplexTypesTest.java  |   2 +-
 .../carbondata/examples/StreamExample.scala     |   4 +-
 .../carbondata/hadoop/CarbonInputFormat.java    |   2 +-
 .../hadoop/api/CarbonTableInputFormat.java      |   4 +-
 .../streaming/CarbonStreamRecordReader.java     |  10 +-
 .../streaming/CarbonStreamRecordWriter.java     |   4 +-
 .../hadoop/util/CarbonInputFormatUtil.java      |   6 +-
 .../hadoop/test/util/StoreCreator.java          |   4 +-
 .../presto/impl/CarbonTableReader.java          |   2 +-
 .../presto/util/CarbonDataStoreCreator.scala    |   4 +-
 .../TestPreAggregateTableSelection.scala        |   2 +-
 .../partition/TestDDLForPartitionTable.scala    |   6 +-
 ...ForPartitionTableWithDefaultProperties.scala |   8 +-
 .../carbondata/spark/load/ValidateUtil.scala    |   4 +-
 .../spark/rdd/AlterTableLoadPartitionRDD.scala  |   2 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |   2 +-
 .../carbondata/spark/rdd/PartitionDropper.scala |   2 +-
 .../spark/rdd/PartitionSplitter.scala           |   2 +-
 .../carbondata/spark/util/CommonUtil.scala      |  32 +----
 .../carbondata/spark/util/DataLoadingUtil.scala |   8 +-
 .../spark/util/GlobalDictionaryUtil.scala       |  12 +-
 .../command/carbonTableSchemaCommon.scala       |   4 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  12 +-
 .../carbondata/spark/util/CarbonSparkUtil.scala |  18 ++-
 .../spark/sql/CarbonDataFrameWriter.scala       |   3 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   4 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  16 +--
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  36 ++++-
 .../scala/org/apache/spark/sql/CarbonScan.scala |   6 +-
 .../org/apache/spark/sql/CarbonSource.scala     |  20 +--
 .../command/CarbonCreateTableCommand.scala      |   4 +-
 .../CarbonDescribeFormattedCommand.scala        |  20 +--
 .../command/CarbonDropTableCommand.scala        |   9 +-
 .../datamap/CarbonDataMapShowCommand.scala      |   4 +-
 .../datamap/CarbonDropDataMapCommand.scala      |  31 +++--
 .../AlterTableCompactionCommand.scala           |  12 +-
 .../management/CarbonShowLoadsCommand.scala     |   4 +-
 .../command/management/CleanFilesCommand.scala  |  10 +-
 .../management/DeleteLoadByIdCommand.scala      |   4 +-
 .../DeleteLoadByLoadDateCommand.scala           |   4 +-
 .../management/LoadTableByInsertCommand.scala   |   2 +-
 .../command/management/LoadTableCommand.scala   |  62 ++++-----
 .../command/mutation/DeleteExecution.scala      |  13 +-
 .../command/mutation/HorizontalCompaction.scala |   8 +-
 .../command/mutation/IUDCommonUtil.scala        |   2 +-
 .../mutation/ProjectForDeleteCommand.scala      |   7 +-
 .../mutation/ProjectForUpdateCommand.scala      |  11 +-
 .../AlterTableDropCarbonPartitionCommand.scala  |  19 +--
 .../AlterTableSplitCarbonPartitionCommand.scala |  19 +--
 .../partition/ShowCarbonPartitionsCommand.scala |   7 +-
 .../CreatePreAggregateTableCommand.scala        |   7 +-
 .../preaaggregate/PreAggregateListeners.scala   |   6 +-
 .../preaaggregate/PreAggregateUtil.scala        |  37 +++---
 .../CarbonAlterTableAddColumnCommand.scala      |   4 +-
 .../CarbonAlterTableDataTypeChangeCommand.scala |   4 +-
 .../CarbonAlterTableDropColumnCommand.scala     |   4 +-
 .../schema/CarbonAlterTableRenameCommand.scala  |   7 +-
 .../strategy/CarbonLateDecodeStrategy.scala     |   4 +-
 .../sql/execution/strategy/DDLStrategy.scala    |  11 +-
 .../strategy/StreamingTableStrategy.scala       |   3 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    |  61 ++++-----
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |  13 +-
 .../apache/spark/sql/hive/CarbonMetaStore.scala |   4 +-
 .../sql/hive/CarbonPreAggregateRules.scala      |   2 +-
 .../apache/spark/sql/hive/CarbonRelation.scala  |  26 ++--
 .../spark/sql/hive/CarbonSessionState.scala     |  13 +-
 .../execution/command/CarbonHiveCommands.scala  |   4 +-
 .../org/apache/spark/util/AlterTableUtil.scala  |  36 ++---
 .../org/apache/spark/util/CleanFiles.scala      |   5 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |   5 +-
 .../apache/spark/util/DeleteSegmentById.scala   |   4 +-
 .../partition/TestAlterPartitionTable.scala     |  32 ++---
 .../spark/util/AllDictionaryTestCase.scala      |  16 +--
 .../spark/util/DictionaryTestCaseUtil.scala     |   6 +-
 .../util/ExternalColumnDictionaryTestCase.scala |  16 +--
 .../loading/DataLoadProcessBuilder.java         |   6 +-
 .../merger/CarbonCompactionExecutor.java        |   4 +-
 .../processing/merger/CarbonCompactionUtil.java |   4 +-
 .../processing/merger/CarbonDataMergerUtil.java |   8 +-
 .../carbondata/processing/merger/TableMeta.java |  42 ------
 .../spliter/AbstractCarbonQueryExecutor.java    |   4 +-
 .../partition/spliter/RowResultProcessor.java   |   2 +-
 .../store/CarbonFactDataHandlerColumnar.java    | 130 -------------------
 .../processing/store/file/FileData.java         |  52 --------
 .../processing/store/file/FileManager.java      |  59 ---------
 .../store/file/IFileManagerComposite.java       |  57 --------
 .../store/writer/AbstractFactDataWriter.java    |   4 -
 .../store/writer/CarbonDataWriterVo.java        |  65 ----------
 .../util/CarbonDataProcessorUtil.java           |   2 +-
 .../processing/util/CarbonLoaderUtil.java       |   5 +
 .../carbondata/processing/StoreCreator.java     |   4 +-
 .../streaming/segment/StreamSegment.java        |  16 +--
 .../streaming/StreamSinkFactory.scala           |   2 +-
 .../CarbonStreamingQueryListener.scala          |   6 +-
 102 files changed, 423 insertions(+), 911 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
 
b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
index a6c89e0..f8d2495 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionaryAndBTree.java
@@ -112,7 +112,7 @@ public class ManageDictionaryAndBTree {
     }
     // clear dictionary cache from LRU cache
     List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     for (CarbonDimension dimension : dimensions) {
       removeDictionaryColumnFromCache(carbonTable.getAbsoluteTableIdentifier(),
           dimension.getColumnId());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
index 75fe78b..2face7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
@@ -120,7 +120,7 @@ public final class CarbonMetadata {
   public CarbonDimension getCarbonDimensionBasedOnColIdentifier(CarbonTable 
carbonTable,
       String columnIdentifier) {
     List<CarbonDimension> listOfCarbonDims =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     for (CarbonDimension dimension : listOfCarbonDims) {
       if (dimension.getColumnId().equals(columnIdentifier)) {
         return dimension;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index f76ddc9..ac580cd 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -347,7 +347,7 @@ public class CarbonTable implements Serializable {
   /**
    * @return the tabelName
    */
-  public String getFactTableName() {
+  public String getTableName() {
     return absoluteTableIdentifier.getCarbonTableIdentifier().getTableName();
   }
 
@@ -569,7 +569,7 @@ public class CarbonTable implements Serializable {
   }
 
   public boolean isPartitionTable() {
-    return null != tablePartitionMap.get(getFactTableName());
+    return null != tablePartitionMap.get(getTableName());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java 
b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 29cf62a..0b531dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -208,7 +208,7 @@ public class CarbonUpdateUtil {
       lockStatus = carbonLock.lockWithRetries();
       if (lockStatus) {
         LOGGER.info(
-                "Acquired lock for table" + table.getDatabaseName() + "." + 
table.getFactTableName()
+                "Acquired lock for table" + table.getDatabaseName() + "." + 
table.getTableName()
                         + " for table status updation");
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
@@ -257,18 +257,18 @@ public class CarbonUpdateUtil {
         status = true;
       } else {
         LOGGER.error("Not able to acquire the lock for Table status updation 
for table " + table
-                .getDatabaseName() + "." + table.getFactTableName());
+                .getDatabaseName() + "." + table.getTableName());
       }
     } finally {
       if (lockStatus) {
         if (carbonLock.unlock()) {
           LOGGER.info(
                  "Table unlocked successfully after table status updation" + 
table.getDatabaseName()
-                          + "." + table.getFactTableName());
+                          + "." + table.getTableName());
         } else {
           LOGGER.error(
                   "Unable to unlock Table lock for table" + 
table.getDatabaseName() + "." + table
-                          .getFactTableName() + " during table status 
updation");
+                          .getTableName() + " during table status updation");
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java 
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 66dfa61..67b8681 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -122,7 +122,7 @@ public class QueryModel implements Serializable {
   public static QueryModel createModel(AbsoluteTableIdentifier 
absoluteTableIdentifier,
       CarbonQueryPlan queryPlan, CarbonTable carbonTable, DataTypeConverter 
converter) {
     QueryModel queryModel = new QueryModel();
-    String factTableName = carbonTable.getFactTableName();
+    String factTableName = carbonTable.getTableName();
     queryModel.setAbsoluteTableIdentifier(absoluteTableIdentifier);
 
     fillQueryModel(queryPlan, carbonTable, queryModel, factTableName);
@@ -141,7 +141,7 @@ public class QueryModel implements Serializable {
     if (null != queryPlan.getFilterExpression()) {
       boolean[] isFilterDimensions = new 
boolean[carbonTable.getDimensionOrdinalMax()];
       boolean[] isFilterMeasures =
-          new 
boolean[carbonTable.getNumberOfMeasures(carbonTable.getFactTableName())];
+          new 
boolean[carbonTable.getNumberOfMeasures(carbonTable.getTableName())];
       processFilterExpression(queryPlan.getFilterExpression(),
           carbonTable.getDimensionByTableName(factTableName),
           carbonTable.getMeasureByTableName(factTableName), 
isFilterDimensions, isFilterMeasures);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 678a6f7..436950b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -482,6 +482,13 @@ public final class CarbonProperties {
   }
 
   /**
+   * Return the store path
+   */
+  public static String getStorePath() {
+    return getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION);
+  }
+
+  /**
    * This method will be used to get the properties value
    *
    * @param key

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
 
b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
index 0de160a..5361fb0 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -94,7 +93,7 @@ public class CarbonMetadataTest {
 
   @Test public void 
testGetCarbonTableReturingProperTableWithProperFactTableName() {
     String expectedResult = "carbonTestTable";
-    assertEquals(expectedResult, 
carbonMetadata.getCarbonTable(tableUniqueName).getFactTableName());
+    assertEquals(expectedResult, 
carbonMetadata.getCarbonTable(tableUniqueName).getTableName());
   }
 
   @Test public void 
testGetCarbonTableReturingProperTableWithProperTableUniqueName() {
@@ -171,7 +170,7 @@ public class CarbonMetadataTest {
     carbonDimensions.add(new CarbonDimension(colSchema1, 1, 1, 2, 1));
     carbonDimensions.add(new CarbonDimension(colSchema2, 2, 2, 2, 2));
     new MockUp<CarbonTable>() {
-      @Mock public String getFactTableName() {
+      @Mock public String getTableName() {
         return "carbonTestTable";
       }
 
@@ -200,7 +199,7 @@ public class CarbonMetadataTest {
     colSchema2.setColumnUniqueId("2");
     carbonChildDimensions.add(new CarbonDimension(colSchema3, 1, 1, 2, 1));
     new MockUp<CarbonTable>() {
-      @Mock public String getFactTableName() {
+      @Mock public String getTableName() {
         return "carbonTestTable";
       }
 
@@ -242,7 +241,7 @@ public class CarbonMetadataTest {
     carbonChildDimensions.add(new CarbonDimension(colSchema2, 1, 1, 2, 1));
 
     new MockUp<CarbonTable>() {
-      @Mock public String getFactTableName() {
+      @Mock public String getTableName() {
         return "carbonTestTable";
       }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
 
b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
index 8b66233..a47b7fd 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableTest.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -57,7 +56,7 @@ public class CarbonTableTest extends TestCase {
   }
 
   @Test public void testFactTableNameReturnsProperFactTableName() {
-    assertEquals("carbonTestTable", carbonTable.getFactTableName());
+    assertEquals("carbonTestTable", carbonTable.getTableName());
   }
 
   @Test public void testTableUniqueNameIsProper() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
 
b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
index e9caf4a..84312cd 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableWithComplexTypesTest.java
@@ -55,7 +55,7 @@ public class CarbonTableWithComplexTypesTest extends TestCase 
{
   }
 
   @Test public void testFactTableNameReturnsProperFactTableName() {
-    assertEquals("carbonTestTable", carbonTable.getFactTableName());
+    assertEquals("carbonTestTable", carbonTable.getTableName());
   }
 
   @Test public void testTableUniqueNameIsProper() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
index 4b59aad..43d545d 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
@@ -89,9 +89,7 @@ object StreamExample {
              | """.stripMargin)
       }
 
-      val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.
-        lookupRelation(Some("default"), 
streamTableName)(spark).asInstanceOf[CarbonRelation].
-        tableMeta.carbonTable
+      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
streamTableName)(spark)
       val tablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       // batch load
       val path = 
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 0aa2974..88d8341 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -364,7 +364,7 @@ public class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
       TableProvider tableProvider = new SingleTableProvider(carbonTable);
       CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, 
null);
       BitSet matchedPartitions = null;
-      PartitionInfo partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName());
+      PartitionInfo partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getTableName());
       if (partitionInfo != null) {
         // prune partitions for filter query on partition table
         matchedPartitions = setMatchedPartitions(null, carbonTable, filter, 
partitionInfo);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 6e840e2..552455a 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -393,7 +393,7 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     Expression filter = getFilterPredicates(job.getConfiguration());
     TableProvider tableProvider = new SingleTableProvider(carbonTable);
     // this will be null in case of corrupt schema file.
-    PartitionInfo partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName());
+    PartitionInfo partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getTableName());
     CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, 
null);
 
     // prune partitions for filter query on partition table
@@ -787,7 +787,7 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     Expression filter = getFilterPredicates(configuration);
     boolean[] isFilterDimensions = new 
boolean[carbonTable.getDimensionOrdinalMax()];
     boolean[] isFilterMeasures =
-        new 
boolean[carbonTable.getNumberOfMeasures(carbonTable.getFactTableName())];
+        new 
boolean[carbonTable.getNumberOfMeasures(carbonTable.getTableName())];
     CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, 
isFilterDimensions,
         isFilterMeasures);
     queryModel.setIsFilterDimensions(isFilterDimensions);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
index a22461d..bdd7c28 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
@@ -153,13 +153,13 @@ public class CarbonStreamRecordReader extends 
RecordReader<Void, Object> {
     }
     carbonTable = model.getTable();
     List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     dimensionCount = dimensions.size();
     List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
     measureCount = measures.size();
     List<CarbonColumn> carbonColumnList =
-        
carbonTable.getStreamStorageOrderColumn(carbonTable.getFactTableName());
+        carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
     storageColumns = carbonColumnList.toArray(new 
CarbonColumn[carbonColumnList.size()]);
     isNoDictColumn = 
CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
     directDictionaryGenerators = new 
DirectDictionaryGenerator[storageColumns.length];
@@ -224,8 +224,8 @@ public class CarbonStreamRecordReader extends 
RecordReader<Void, Object> {
   private void initializeFilter() {
 
     List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
-        
.getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()),
-            carbonTable.getMeasureByTableName(carbonTable.getFactTableName()));
+        
.getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
     int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
     for (int i = 0; i < dimLensWithComplex.length; i++) {
       dimLensWithComplex[i] = Integer.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
index 7df87e3..fdd0504 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
@@ -251,8 +251,8 @@ public class CarbonStreamRecordWriter extends 
RecordWriter<Void, Object> {
 
   private void writeFileHeader() throws IOException {
     List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
-        
.getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()),
-            carbonTable.getMeasureByTableName(carbonTable.getFactTableName()));
+        
.getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
     int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
     for (int i = 0; i < dimLensWithComplex.length; i++) {
       dimLensWithComplex[i] = Integer.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 630828a..3afad94 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -52,7 +52,7 @@ public class CarbonInputFormatUtil {
     if (columnString != null) {
       columns = columnString.split(",");
     }
-    String factTableName = carbonTable.getFactTableName();
+    String factTableName = carbonTable.getTableName();
     CarbonQueryPlan plan = new CarbonQueryPlan(carbonTable.getDatabaseName(), 
factTableName);
     // fill dimensions
     // If columns are null, set all dimensions and measures
@@ -120,9 +120,9 @@ public class CarbonInputFormatUtil {
   public static void processFilterExpression(Expression filterExpression, 
CarbonTable carbonTable,
       boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
     List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+        carbonTable.getMeasureByTableName(carbonTable.getTableName());
     QueryModel.processFilterExpression(filterExpression, dimensions, measures,
         isFilterDimensions, isFilterMeasures);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index b4145ef..c45f910 100644
--- 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -309,9 +309,9 @@ public class StoreCreator {
     String header = reader.readLine();
     String[] split = header.split(",");
     List<CarbonColumn> allCols = new ArrayList<CarbonColumn>();
-    List<CarbonDimension> dims = 
table.getDimensionByTableName(table.getFactTableName());
+    List<CarbonDimension> dims = 
table.getDimensionByTableName(table.getTableName());
     allCols.addAll(dims);
-    List<CarbonMeasure> msrs = 
table.getMeasureByTableName(table.getFactTableName());
+    List<CarbonMeasure> msrs = 
table.getMeasureByTableName(table.getTableName());
     allCols.addAll(msrs);
     Set<String>[] set = new HashSet[dims.size()];
     for (int i = 0; i < set.length; i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
 
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 8e6abd4..f72bb7a 100755
--- 
a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ 
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -363,7 +363,7 @@ public class CarbonTableReader {
         .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier(), 
null).getPath();
     config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
     config.set(CarbonTableInputFormat.DATABASE_NAME, 
carbonTable.getDatabaseName());
-    config.set(CarbonTableInputFormat.TABLE_NAME, 
carbonTable.getFactTableName());
+    config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName());
 
     try {
       CarbonTableInputFormat.setTableInfo(config, tableInfo);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git 
a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
 
b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 17a4188..1430baf 100644
--- 
a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ 
b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -333,10 +333,10 @@ object CarbonDataStoreCreator {
     val split: Array[String] = header.split(",")
     val allCols: util.List[CarbonColumn] = new util.ArrayList[CarbonColumn]()
     val dims: util.List[CarbonDimension] =
-      table.getDimensionByTableName(table.getFactTableName)
+      table.getDimensionByTableName(table.getTableName)
     allCols.addAll(dims)
     val msrs: List[CarbonMeasure] =
-      table.getMeasureByTableName(table.getFactTableName)
+      table.getMeasureByTableName(table.getTableName)
     allCols.addAll(msrs)
     val set: Array[util.Set[String]] = Array.ofDim[util.Set[String]](dims.size)
     for (i <- set.indices) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index 6b435c6..1d41664 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -147,7 +147,7 @@ class TestPreAggregateTableSelection extends QueryTest with 
BeforeAndAfterAll {
       case logicalRelation:LogicalRelation =>
         
if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
           val relation = 
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-          
if(relation.carbonTable.getFactTableName.equalsIgnoreCase(actualTableName)) {
+          
if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) {
             isValidPlan = true
           }
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
index 3f99922..df1bd2e 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
@@ -51,7 +51,7 @@ class TestDDLForPartitionTable  extends QueryTest with 
BeforeAndAfterAll {
       """.stripMargin)
 
     val carbonTable = 
CarbonMetadata.getInstance().getCarbonTable("default_hashTable")
-    val partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     
assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("empno"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == 
DataTypes.INT)
@@ -74,7 +74,7 @@ class TestDDLForPartitionTable  extends QueryTest with 
BeforeAndAfterAll {
       """.stripMargin)
 
     val carbonTable = 
CarbonMetadata.getInstance().getCarbonTable("default_rangeTable")
-    val partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     
assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("doj"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == 
DataTypes.TIMESTAMP)
@@ -101,7 +101,7 @@ class TestDDLForPartitionTable  extends QueryTest with 
BeforeAndAfterAll {
         |  'LIST_INFO'='0, 1, (2, 3)')
       """.stripMargin)
     val carbonTable = 
CarbonMetadata.getInstance().getCarbonTable("default_listTable")
-    val partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     
assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("workgroupcategory"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == 
DataTypes.STRING)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
index 317e2e2..c17ca00 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
@@ -45,7 +45,7 @@ class TestDDLForPartitionTableWithDefaultProperties  extends 
QueryTest with Befo
       """.stripMargin)
 
     val carbonTable = 
CarbonMetadata.getInstance().getCarbonTable("default_hashTable")
-    val partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     
assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("empno"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == 
DataTypes.INT)
@@ -68,7 +68,7 @@ class TestDDLForPartitionTableWithDefaultProperties  extends 
QueryTest with Befo
       """.stripMargin)
 
     val carbonTable = 
CarbonMetadata.getInstance().getCarbonTable("default_rangeTable")
-    val partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     
assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("doj"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == 
DataTypes.TIMESTAMP)
@@ -96,7 +96,7 @@ class TestDDLForPartitionTableWithDefaultProperties  extends 
QueryTest with Befo
         |  'DICTIONARY_INCLUDE'='projectenddate')
       """.stripMargin)
     val carbonTable = 
CarbonMetadata.getInstance().getCarbonTable("default_listTable")
-    val partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     
assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("projectenddate"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == 
DataTypes.TIMESTAMP)
@@ -128,7 +128,7 @@ class TestDDLForPartitionTableWithDefaultProperties  
extends QueryTest with Befo
         |  'LIST_INFO'='2017-06-11 , 2017-06-13')
       """.stripMargin)
     val carbonTable = 
CarbonMetadata.getInstance().getCarbonTable("default_listTableDate")
-    val partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     assert(partitionInfo != null)
     
assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("projectenddate"))
     assert(partitionInfo.getColumnSchemaList.get(0).getDataType == 
DataTypes.DATE)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
index 8eb5101..51e0cc4 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
@@ -51,8 +51,8 @@ object ValidateUtil {
   def validateSortScope(carbonTable: CarbonTable, sortScope: String): Unit = {
     if (sortScope != null) {
       // Don't support use global sort on partitioned table.
-      if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null &&
-        
sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) {
+      if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null &&
+          
sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) {
         throw new MalformedCarbonCommandException("Don't support use global 
sort on partitioned " +
           "table.")
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 5c6760a..37ab8c3 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -46,7 +46,7 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: 
AlterPartitionModel,
     val oldPartitionIds = alterPartitionModel.oldPartitionIds
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val databaseName = carbonTable.getDatabaseName
-    val factTableName = carbonTable.getFactTableName
+    val factTableName = carbonTable.getTableName
     val partitionInfo = carbonTable.getPartitionInfo(factTableName)
 
     override protected def getPartitions: Array[Partition] = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 9ca21bc..0fed5a7 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -548,7 +548,7 @@ class PartitionTableDataLoaderRDD[K, V](
       val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
       val model: CarbonLoadModel = carbonLoadModel
       val carbonTable = model.getCarbonDataLoadSchema.getCarbonTable
-      val partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+      val partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getTableName)
       val uniqueLoadStatusId =
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + 
theSplit.index
       try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
index a82ea00..2aa5610 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
@@ -41,7 +41,7 @@ object PartitionDropper {
     val dropWithData = dropPartitionCallableModel.dropWithData
     val carbonTable = dropPartitionCallableModel.carbonTable
     val dbName = carbonTable.getDatabaseName
-    val tableName = carbonTable.getFactTableName
+    val tableName = carbonTable.getTableName
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val partitionInfo = carbonTable.getPartitionInfo(tableName)
     val partitioner = PartitionFactory.getPartitioner(partitionInfo)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
index db664b3..9106cca 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
@@ -40,7 +40,7 @@ object PartitionSplitter {
      val carbonLoadModel = splitPartitionCallableModel.carbonLoadModel
      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-     val tableName = carbonTable.getFactTableName
+     val tableName = carbonTable.getTableName
      val databaseName = carbonTable.getDatabaseName
      val bucketInfo = carbonTable.getBucketingInfo(tableName)
      var finalSplitStatus = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 1b21e3d..a3572ed 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -55,7 +55,6 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
CarbonLoaderUtil}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.CarbonMergeFilesRDD
@@ -516,7 +515,6 @@ object CommonUtil {
   }
 
   def readAndUpdateLoadProgressInTableMeta(model: CarbonLoadModel,
-      storePath: String,
       insertOverwrite: Boolean): Unit = {
     val newLoadMetaEntry = new LoadMetadataDetails
     val status = if (insertOverwrite) {
@@ -528,16 +526,13 @@ object CommonUtil {
     // reading the start time of data load.
     val loadStartTime = CarbonUpdateUtil.readCurrentTime
     model.setFactTimeStamp(loadStartTime)
-    CarbonLoaderUtil
-      .populateNewLoadMetaEntry(newLoadMetaEntry, status, 
model.getFactTimeStamp, false)
+    CarbonLoaderUtil.populateNewLoadMetaEntry(
+      newLoadMetaEntry, status, model.getFactTimeStamp, false)
     val entryAdded: Boolean =
       CarbonLoaderUtil.recordLoadMetadata(newLoadMetaEntry, model, true, 
insertOverwrite)
     if (!entryAdded) {
-      sys
-        .error(s"Failed to add entry in table status for ${ 
model.getDatabaseName }.${
-          model
-            .getTableName
-        }")
+      sys.error(s"Failed to add entry in table status for " +
+                s"${ model.getDatabaseName }.${model.getTableName}")
     }
   }
 
@@ -856,26 +851,9 @@ object CommonUtil {
       CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
       CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
       new CarbonMergeFilesRDD(sparkContext, 
AbsoluteTableIdentifier.from(tablePath,
-        carbonTable.getDatabaseName, 
carbonTable.getFactTableName).getTablePath,
+        carbonTable.getDatabaseName, carbonTable.getTableName).getTablePath,
         segmentIds).collect()
     }
   }
 
-  /**
-   * can be removed with the spark 1.6 removal
-   * @param tableMeta
-   * @return
-   */
-  @deprecated
-  def getTablePath(tableMeta: TableMeta): String = {
-    if (tableMeta.tablePath == null) {
-      tableMeta.storePath + CarbonCommonConstants.FILE_SEPARATOR +
-      tableMeta.carbonTableIdentifier.getDatabaseName +
-      CarbonCommonConstants.FILE_SEPARATOR + 
tableMeta.carbonTableIdentifier.getTableName
-    }
-    else {
-      tableMeta.tablePath
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 35e1e78..84ad85e 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -180,10 +180,10 @@ object DataLoadingUtil {
       options: immutable.Map[String, String],
       optionsFinal: mutable.Map[String, String],
       carbonLoadModel: CarbonLoadModel): Unit = {
-    carbonLoadModel.setTableName(table.getFactTableName)
+    carbonLoadModel.setTableName(table.getTableName)
     carbonLoadModel.setDatabaseName(table.getDatabaseName)
     carbonLoadModel.setTablePath(table.getTablePath)
-    carbonLoadModel.setTableName(table.getFactTableName)
+    carbonLoadModel.setTableName(table.getTableName)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
     // Need to fill dimension relation
     carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
@@ -199,7 +199,7 @@ object DataLoadingUtil {
     val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
     val all_dictionary_path = optionsFinal("all_dictionary_path")
     val column_dict = optionsFinal("columndict")
-    ValidateUtil.validateDateFormat(dateFormat, table, table.getFactTableName)
+    ValidateUtil.validateDateFormat(dateFormat, table, table.getTableName)
     ValidateUtil.validateSortScope(table, sort_scope)
 
     if (bad_records_logger_enable.toBoolean ||
@@ -236,7 +236,7 @@ object DataLoadingUtil {
         }
       } else {
         if (fileHeader.isEmpty) {
-          fileHeader = table.getCreateOrderColumn(table.getFactTableName)
+          fileHeader = table.getCreateOrderColumn(table.getTableName)
             .asScala.map(_.getColName).mkString(",")
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 975fc9b..0bf2b16 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -676,17 +676,17 @@ object GlobalDictionaryUtil {
    */
   def generateGlobalDictionary(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      storePath: String,
+      tablePath: String,
       dataFrame: Option[DataFrame] = None): Unit = {
     try {
       val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
       val carbonTableIdentifier = 
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
       // create dictionary folder if not exists
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, 
carbonTableIdentifier)
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, 
carbonTableIdentifier)
       val dictfolderPath = carbonTablePath.getMetadataDirectoryPath
       // columns which need to generate global dictionary file
       val dimensions = carbonTable.getDimensionByTableName(
-        carbonTable.getFactTableName).asScala.toArray
+        carbonTable.getTableName).asScala.toArray
       // generate global dict from pre defined column dict file
       carbonLoadModel.initPredefDictMap()
 
@@ -701,7 +701,7 @@ object GlobalDictionaryUtil {
         if (colDictFilePath != null) {
           // generate predefined dictionary
           generatePredefinedColDictionary(colDictFilePath, 
carbonTableIdentifier,
-            dimensions, carbonLoadModel, sqlContext, storePath, dictfolderPath)
+            dimensions, carbonLoadModel, sqlContext, tablePath, dictfolderPath)
         }
         if (headers.length > df.columns.length) {
           val msg = "The number of columns in the file header do not match the 
" +
@@ -717,7 +717,7 @@ object GlobalDictionaryUtil {
           // select column to push down pruning
           df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
           val model = createDictionaryLoadModel(carbonLoadModel, 
carbonTableIdentifier,
-            requireDimension, storePath, dictfolderPath, false)
+            requireDimension, tablePath, dictfolderPath, false)
           // combine distinct value in a block and partition by column
           val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
             .partitionBy(new ColumnPartitioner(model.primDimensions.length))
@@ -731,7 +731,7 @@ object GlobalDictionaryUtil {
       } else {
         generateDictionaryFromDictionaryFiles(sqlContext,
           carbonLoadModel,
-          storePath,
+          tablePath,
           carbonTableIdentifier,
           dictfolderPath,
           dimensions,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 756de6b..2f6b277 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -454,7 +454,7 @@ class TableNewProcessor(cm: TableModel) {
       val field = cm.dimCols.find(keyDim equals _.column).get
       val encoders = if (cm.parentTable.isDefined && 
cm.dataMapRelation.get.get(field).isDefined) {
         cm.parentTable.get.getColumnByName(
-          cm.parentTable.get.getFactTableName,
+          cm.parentTable.get.getTableName,
           
cm.dataMapRelation.get.get(field).get.columnTableRelation.get.parentColumnName).getEncoder
       } else {
         val encoders = new java.util.ArrayList[Encoding]()
@@ -479,7 +479,7 @@ class TableNewProcessor(cm: TableModel) {
         val encoders = if (cm.parentTable.isDefined &&
                            cm.dataMapRelation.get.get(field).isDefined) {
           cm.parentTable.get.getColumnByName(
-            cm.parentTable.get.getFactTableName,
+            cm.parentTable.get.getTableName,
             cm.dataMapRelation.get.get(field).get.
               columnTableRelation.get.parentColumnName).getEncoder
         } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1ca7456..c12d2ef 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -179,7 +179,7 @@ object CarbonDataRDDFactory {
             while (null != tableForCompaction) {
               LOGGER.info("Compaction request has been identified for table " +
                   s"${ tableForCompaction.getDatabaseName }." +
-                  s"${ tableForCompaction.getFactTableName}")
+                  s"${ tableForCompaction.getTableName}")
               val table: CarbonTable = tableForCompaction
               val metadataPath = table.getMetaDataFilepath
               val compactionType = 
CarbonCompactionUtil.determineCompactionType(metadataPath)
@@ -204,7 +204,7 @@ object CarbonDataRDDFactory {
                 case e: Exception =>
                   LOGGER.error("Exception in compaction thread for table " +
                       s"${ tableForCompaction.getDatabaseName }." +
-                      s"${ tableForCompaction.getFactTableName }")
+                      s"${ tableForCompaction.getTableName }")
                 // not handling the exception. only logging as this is not the 
table triggered
                 // by user.
               } finally {
@@ -216,7 +216,7 @@ object CarbonDataRDDFactory {
                   
skipCompactionTables.+=:(tableForCompaction.getCarbonTableIdentifier)
                   LOGGER.error("Compaction request file can not be deleted for 
table " +
                       s"${ tableForCompaction.getDatabaseName }." +
-                      s"${ tableForCompaction.getFactTableName }")
+                      s"${ tableForCompaction.getTableName }")
                 }
               }
               // ********* check again for all the tables.
@@ -248,7 +248,7 @@ object CarbonDataRDDFactory {
       table: CarbonTable
   ): CarbonLoadModel = {
     val loadModel = new CarbonLoadModel
-    loadModel.setTableName(table.getFactTableName)
+    loadModel.setTableName(table.getTableName)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
     // Need to fill dimension relation
     loadModel.setCarbonDataLoadSchema(dataLoadSchema)
@@ -319,7 +319,7 @@ object CarbonDataRDDFactory {
           }
         }
       } else {
-        status = if 
(carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
+        status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != 
null) {
           loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel)
         } else if (isSortTable && 
sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
           
DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext,
@@ -782,7 +782,7 @@ object CarbonDataRDDFactory {
       dataFrame: Option[DataFrame],
       carbonLoadModel: CarbonLoadModel): RDD[Row] = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
     val partitionColumn = 
partitionInfo.getColumnSchemaList.get(0).getColumnName
     val partitionColumnDataType = 
partitionInfo.getColumnSchemaList.get(0).getDataType
     val columns = carbonLoadModel.getCsvHeaderColumns

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index 1e6a36e..47f5344 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -21,23 +21,20 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.hive.{CarbonMetaData, CarbonRelation, 
DictionaryMap}
 
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.processing.merger.TableMeta
 
 case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
 
 object CarbonSparkUtil {
 
   def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
-    val dimensionsAttr = 
carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
+    val dimensionsAttr = 
carbonTable.getDimensionByTableName(carbonTable.getTableName)
         .asScala.map(x => x.getColName) // wf : may be problem
-    val measureAttr = 
carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
+    val measureAttr = 
carbonTable.getMeasureByTableName(carbonTable.getTableName)
         .asScala.map(x => x.getColName)
     val dictionary =
-      
carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { 
f =>
+      
carbonTable.getDimensionByTableName(carbonTable.getTableName).asScala.map { f =>
         (f.getColName.toLowerCase,
             f.hasEncoding(Encoding.DICTIONARY) && 
!f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
                 !f.getDataType.isComplexType)
@@ -47,10 +44,11 @@ object CarbonSparkUtil {
 
   def createCarbonRelation(tableInfo: TableInfo, tablePath: String): 
CarbonRelation = {
     val table = CarbonTable.buildFromTableInfo(tableInfo)
-    val meta = new TableMeta(table.getCarbonTableIdentifier,
-      table.getTablePath, tablePath, table)
-    CarbonRelation(tableInfo.getDatabaseName, 
tableInfo.getFactTable.getTableName,
-      CarbonSparkUtil.createSparkMeta(table), meta)
+    CarbonRelation(
+      tableInfo.getDatabaseName,
+      tableInfo.getFactTable.getTableName,
+      CarbonSparkUtil.createSparkMeta(table),
+      table)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 44fbb37..b74576d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.types._
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonType}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.CarbonOption
 
 class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
@@ -58,7 +59,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val 
dataFrame: DataFrame) {
    */
   private def loadTempCSV(options: CarbonOption): Unit = {
     // temporary solution: write to csv file, then load the csv into carbon
-    val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).storePath
+    val storePath = CarbonProperties.getStorePath
     val tempCSVFolder = new 
StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
       .append("tempCSV")
       .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 22933f2..72f40ac 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -50,7 +50,7 @@ case class CarbonDatasourceHadoopRelation(
   lazy val identifier: AbsoluteTableIdentifier = 
AbsoluteTableIdentifier.from(paths.head,
     parameters("dbname"), parameters("tablename"))
   lazy val databaseName: String = carbonTable.getDatabaseName
-  lazy val tableName: String = carbonTable.getFactTableName
+  lazy val tableName: String = carbonTable.getTableName
   CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
 
   @transient lazy val carbonRelation: CarbonRelation =
@@ -58,7 +58,7 @@ case class CarbonDatasourceHadoopRelation(
     createCarbonRelation(parameters, identifier, sparkSession)
 
 
-  @transient lazy val carbonTable: CarbonTable = 
carbonRelation.tableMeta.carbonTable
+  @transient lazy val carbonTable: CarbonTable = carbonRelation.carbonTable
 
   override def sqlContext: SQLContext = sparkSession.sqlContext
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index c7db436..9d88c4c 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -72,7 +72,7 @@ case class CarbonDictionaryDecoder(
     attachTree(this, "execute") {
       val absoluteTableIdentifiers = relations.map { relation =>
         val carbonTable = 
relation.carbonRelation.carbonRelation.metaData.carbonTable
-        (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
+        (carbonTable.getTableName, carbonTable.getAbsoluteTableIdentifier)
       }.toMap
 
       if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
@@ -125,7 +125,7 @@ case class CarbonDictionaryDecoder(
 
     val absoluteTableIdentifiers = relations.map { relation =>
       val carbonTable = 
relation.carbonRelation.carbonRelation.metaData.carbonTable
-      (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
+      (carbonTable.getTableName, carbonTable.getAbsoluteTableIdentifier)
     }.toMap
 
     if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
@@ -323,7 +323,7 @@ object CarbonDictionaryDecoder {
       if (relation.isDefined && canBeDecoded(attr, profile)) {
         val carbonTable = 
relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension = carbonTable
-          .getDimensionByName(carbonTable.getFactTableName, attr.name)
+          .getDimensionByName(carbonTable.getTableName, attr.name)
         if (carbonDimension != null &&
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
             !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
@@ -355,7 +355,7 @@ object CarbonDictionaryDecoder {
       if (relation.isDefined) {
         val carbonTable = 
relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension = carbonTable
-          .getDimensionByName(carbonTable.getFactTableName, attr.name)
+          .getDimensionByName(carbonTable.getTableName, attr.name)
         if (carbonDimension != null &&
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
             !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
@@ -432,12 +432,12 @@ object CarbonDictionaryDecoder {
       if (relation.isDefined && CarbonDictionaryDecoder.canBeDecoded(attr, 
profile)) {
         val carbonTable = 
relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension =
-          carbonTable.getDimensionByName(carbonTable.getFactTableName, 
attr.name)
+          carbonTable.getDimensionByName(carbonTable.getTableName, attr.name)
         if (carbonDimension != null &&
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
             !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
             !carbonDimension.isComplex) {
-          (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
+          (carbonTable.getTableName, carbonDimension.getColumnIdentifier,
             carbonDimension)
         } else {
           (null, null, null)
@@ -485,12 +485,12 @@ class CarbonDecoderRDD(
       if (relation.isDefined && canBeDecoded(attr)) {
         val carbonTable = 
relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension =
-          carbonTable.getDimensionByName(carbonTable.getFactTableName, 
attr.name)
+          carbonTable.getDimensionByName(carbonTable.getTableName, attr.name)
         if (carbonDimension != null &&
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
             !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
             !carbonDimension.isComplex()) {
-          (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
+          (carbonTable.getTableName, carbonDimension.getColumnIdentifier,
             carbonDimension)
         } else {
           (null, null, null)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 1ee7650..dcfce0f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -20,13 +20,15 @@ package org.apache.spark.sql
 import java.util.Map
 import java.util.concurrent.ConcurrentHashMap
 
-import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, 
CarbonSessionCatalog}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, 
CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.sql.internal.CarbonSQLConf
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, 
SessionParams, ThreadLocalSessionInfo}
-import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationContext, 
OperationListenerBus}
+import org.apache.carbondata.events.{CarbonEnvInitPreEvent, 
OperationListenerBus}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
@@ -41,8 +43,6 @@ class CarbonEnv {
 
   var carbonSessionInfo: CarbonSessionInfo = _
 
-  var storePath: String = _
-
   private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   // set readsupport class global so that the executor can get it.
@@ -74,7 +74,7 @@ class CarbonEnv {
         config.addDefaultCarbonSessionParams()
         carbonMetastore = {
           val properties = CarbonProperties.getInstance()
-          storePath = 
properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
+          var storePath = 
properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
           if (storePath == null) {
             storePath = sparkSession.conf.get("spark.sql.warehouse.dir")
             properties.addProperty(CarbonCommonConstants.STORE_LOCATION, 
storePath)
@@ -112,6 +112,30 @@ object CarbonEnv {
       carbonEnv
     }
   }
-}
 
+  /**
+   * Return carbon table instance by looking up relation in `sparkSession`
+   */
+  def getCarbonTable(
+      databaseNameOp: Option[String],
+      tableName: String)
+    (sparkSession: SparkSession): CarbonTable = {
+    CarbonEnv
+      .getInstance(sparkSession)
+      .carbonMetastore
+      .lookupRelation(databaseNameOp, tableName)(sparkSession)
+      .asInstanceOf[CarbonRelation]
+      .carbonTable
+  }
 
+  def getCarbonTable(
+      tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession): CarbonTable = {
+    CarbonEnv
+      .getInstance(sparkSession)
+      .carbonMetastore
+      .lookupRelation(tableIdentifier)(sparkSession)
+      .asInstanceOf[CarbonRelation]
+      .carbonTable
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index 0806421..99a7c37 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -65,11 +65,11 @@ case class CarbonScan(
       attributesRaw = attributeOut
     }
 
-    val columns = 
carbonTable.getCreateOrderColumn(carbonTable.getFactTableName)
+    val columns = carbonTable.getCreateOrderColumn(carbonTable.getTableName)
     val colAttr = new Array[Attribute](columns.size())
     attributesRaw.foreach { attr =>
     val column =
-        carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
+        carbonTable.getColumnByName(carbonTable.getTableName, attr.name)
       if(column != null) {
         colAttr(columns.indexOf(column)) = attr
        }
@@ -78,7 +78,7 @@ case class CarbonScan(
 
     var queryOrder: Integer = 0
     attributesRaw.foreach { attr =>
-      val carbonColumn = 
carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
+      val carbonColumn = carbonTable.getColumnByName(carbonTable.getTableName, 
attr.name)
       if (carbonColumn != null) {
         if (carbonColumn.isDimension()) {
           val dim = new QueryDimension(attr.name)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index fba590e..6331f12 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -165,7 +165,7 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
       } else {
         CarbonEnv.getInstance(sparkSession).carbonMetastore
           .lookupRelation(Option(dbName), tableName)(sparkSession)
-        (CarbonEnv.getInstance(sparkSession).storePath + 
s"/$dbName/$tableName", parameters)
+        (CarbonProperties.getStorePath + s"/$dbName/$tableName", parameters)
       }
     } catch {
       case ex: NoSuchTableException =>
@@ -199,11 +199,10 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
       if (parameters.contains("tablePath")) {
         (parameters("tablePath"), parameters)
       } else if (!sparkSession.isInstanceOf[CarbonSession]) {
-        (CarbonEnv.getInstance(sparkSession).storePath + "/" + dbName + "/" + 
tableName, parameters)
+        (CarbonProperties.getStorePath + "/" + dbName + "/" + tableName, 
parameters)
       } else {
-        val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .lookupRelation(Option(dbName), 
tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        (relation.tableMeta.tablePath, parameters)
+        val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(sparkSession)
+        (carbonTable.getTablePath, parameters)
       }
     } catch {
       case ex: Exception =>
@@ -235,15 +234,11 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
     }
     if (tablePathOption.isDefined) {
       val sparkSession = sqlContext.sparkSession
-      val identifier: AbsoluteTableIdentifier =
-        AbsoluteTableIdentifier.from(tablePathOption.get, dbName, tableName)
-      val carbonTable =
-        CarbonEnv.getInstance(sparkSession).carbonMetastore.
-          createCarbonRelation(parameters, identifier, 
sparkSession).tableMeta.carbonTable
+      val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(sparkSession)
 
       if (!carbonTable.isStreamingTable) {
         throw new CarbonStreamException(s"Table 
${carbonTable.getDatabaseName}." +
-                                        s"${carbonTable.getFactTableName} is 
not a streaming table")
+                                        s"${carbonTable.getTableName} is not a 
streaming table")
       }
 
       // create sink
@@ -314,8 +309,7 @@ object CarbonSource {
     val tableName: String = properties.getOrElse("tableName", "").toLowerCase
     val model = createTableInfoFromParams(properties, dataSchema, dbName, 
tableName)
     val tableInfo: TableInfo = TableNewProcessor(model)
-    val dbLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonEnv.getInstance(sparkSession).storePath)
+    val dbLocation = GetDB.getDatabaseLocation(dbName, sparkSession, 
CarbonProperties.getStorePath)
     val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + 
tableName
     val schemaEvolutionEntry = new SchemaEvolutionEntry
     schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
index 197b23b..f83766d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
@@ -25,7 +25,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 
 case class CarbonCreateTableCommand(
     cm: TableModel,
@@ -37,7 +37,7 @@ case class CarbonCreateTableCommand(
   }
 
   override def processSchema(sparkSession: SparkSession): Seq[Row] = {
-    val storePath = CarbonEnv.getInstance(sparkSession).storePath
+    val storePath = CarbonProperties.getStorePath
     CarbonEnv.getInstance(sparkSession).carbonMetastore.
       checkSchemasModifiedTimeAndReloadTables()
     val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
index 7dcad9a..b233c99 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
@@ -29,6 +29,7 @@ import org.codehaus.jackson.map.ObjectMapper
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.util.CarbonProperties
 
 private[sql] case class CarbonDescribeFormattedCommand(
     child: SparkPlan,
@@ -68,7 +69,7 @@ private[sql] case class CarbonDescribeFormattedCommand(
       val colComment = field.getComment().getOrElse("null")
       val comment = if (dims.contains(fieldName)) {
         val dimension = relation.metaData.carbonTable.getDimensionByName(
-          relation.tableMeta.carbonTableIdentifier.getTableName, fieldName)
+          relation.carbonTable.getTableName, fieldName)
         if (null != dimension.getColumnProperties && 
!dimension.getColumnProperties.isEmpty) {
           colProps.append(fieldName).append(".")
             .append(mapper.writeValueAsString(dimension.getColumnProperties))
@@ -101,12 +102,11 @@ private[sql] case class CarbonDescribeFormattedCommand(
       colProps.toString()
     }
     results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
-    results ++= Seq(("Database Name: ", 
relation.tableMeta.carbonTableIdentifier
-      .getDatabaseName, "")
+    results ++= Seq(("Database Name: ", relation.carbonTable.getDatabaseName, 
"")
     )
-    results ++= Seq(("Table Name: ", 
relation.tableMeta.carbonTableIdentifier.getTableName, ""))
-    results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
-    val carbonTable = relation.tableMeta.carbonTable
+    results ++= Seq(("Table Name: ", relation.carbonTable.getTableName, ""))
+    results ++= Seq(("CARBON Store Path: ", CarbonProperties.getStorePath, ""))
+    val carbonTable = relation.carbonTable
     // Carbon table support table comment
     val tableComment = carbonTable.getTableInfo.getFactTable.getTableProperties
       .getOrDefault(CarbonCommonConstants.TABLE_COMMENT, "")
@@ -122,14 +122,14 @@ private[sql] case class CarbonDescribeFormattedCommand(
       results ++= Seq(("ADAPTIVE", "", ""))
     }
     results ++= Seq(("SORT_COLUMNS", 
relation.metaData.carbonTable.getSortColumns(
-      relation.tableMeta.carbonTableIdentifier.getTableName).asScala
+      relation.carbonTable.getTableName).asScala
       .map(column => column).mkString(","), ""))
     val dimension = carbonTable
-      
.getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+      .getDimensionByTableName(relation.carbonTable.getTableName)
     results ++= getColumnGroups(dimension.asScala.toList)
-    if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
+    if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
       results ++=
-      Seq(("Partition Columns: ", 
carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+      Seq(("Partition Columns: ", 
carbonTable.getPartitionInfo(carbonTable.getTableName)
         .getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), ""))
     }
     results.map {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5fc7f06f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
index 0343393..f0a916a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
@@ -30,7 +30,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events._
 
@@ -50,12 +50,11 @@ case class CarbonDropTableCommand(
     val LOGGER: LogService = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
     val identifier = TableIdentifier(tableName, Option(dbName))
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, 
"")
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, 
LockUsage.DROP_TABLE_LOCK)
     val carbonEnv = CarbonEnv.getInstance(sparkSession)
     val catalog = carbonEnv.carbonMetastore
     val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonEnv.getInstance(sparkSession).storePath)
+      CarbonProperties.getStorePath)
     val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + 
tableName.toLowerCase
     val absoluteTableIdentifier = AbsoluteTableIdentifier
       .from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
@@ -68,7 +67,7 @@ case class CarbonDropTableCommand(
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
       val carbonTable: Option[CarbonTable] =
         catalog.getTableFromMetadataCache(dbName, tableName) match {
-          case Some(tableMeta) => Some(tableMeta.carbonTable)
+          case Some(carbonTable) => Some(carbonTable)
           case None => try {
             Some(catalog.lookupRelation(identifier)(sparkSession)
               .asInstanceOf[CarbonRelation].metaData.carbonTable)
@@ -131,7 +130,7 @@ case class CarbonDropTableCommand(
     // delete the table folder
     val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
     val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonEnv.getInstance(sparkSession).storePath)
+      CarbonProperties.getStorePath)
     val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + 
tableName.toLowerCase
     val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, 
tableName)
     val metadataFilePath =

Reply via email to