Repository: carbondata
Updated Branches:
  refs/heads/datamap e616162c0 -> 5a625f4ce


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
new file mode 100644
index 0000000..96e840f
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.preaaggregate
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
+import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
+import 
org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, 
CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
+
+/**
+ * Below helper class will be used to create pre-aggregate table
+ * and updating the parent table about the child table information
+ * It will be either success or nothing happen in case of failure:
+ * 1. failed to create pre aggregate table.
+ * 2. failed to update main table
+ *
+ */
+case class PreAggregateTableHelper(
+    var parentTable: CarbonTable,
+    dataMapName: String,
+    dataMapClassName: String,
+    dataMapProperties: java.util.Map[String, String],
+    queryString: String,
+    timeSeriesFunction: String) {
+
+  var loadCommand: CarbonLoadDataCommand = _
+
+  def initMeta(sparkSession: SparkSession): Seq[Row] = {
+    val dmProperties = dataMapProperties.asScala
+    val updatedQuery = new 
CarbonSpark2SqlParser().addPreAggFunction(queryString)
+    val df = sparkSession.sql(updatedQuery)
+    val fieldRelationMap = 
PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
+      df.logicalPlan, queryString)
+    val fields = fieldRelationMap.keySet.toSeq
+    val tableProperties = mutable.Map[String, String]()
+    dmProperties.foreach(t => tableProperties.put(t._1, t._2))
+
+    val selectTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
+    if (!parentTable.getTableName.equalsIgnoreCase(selectTable.getTableName)) {
+      throw new MalformedDataMapCommandException(
+        "Parent table name is different in select and create")
+    }
+    var neworder = Seq[String]()
+    val parentOrder = 
parentTable.getSortColumns(parentTable.getTableName).asScala
+    parentOrder.foreach(parentcol =>
+      fields.filter(col => fieldRelationMap(col).aggregateFunction.isEmpty &&
+                           parentcol.equals(fieldRelationMap(col).
+                             columnTableRelationList.get(0).parentColumnName))
+        .map(cols => neworder :+= cols.column)
+    )
+    tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, 
neworder.mkString(","))
+    tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable.
+      getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants
+      .LOAD_SORT_SCOPE_DEFAULT))
+    tableProperties
+      .put(CarbonCommonConstants.TABLE_BLOCKSIZE, 
parentTable.getBlockSizeInMB.toString)
+    val tableIdentifier =
+      TableIdentifier(parentTable.getTableName + "_" + dataMapName,
+        Some(parentTable.getDatabaseName))
+    // prepare table model of the collected tokens
+    val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(
+      ifNotExistPresent = false,
+      new 
CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
+      tableIdentifier.table.toLowerCase,
+      fields,
+      Seq(),
+      tableProperties,
+      None,
+      isAlterFlow = false,
+      None)
+
+    // updating the relation identifier, this will be stored in child table
+    // which can be used during dropping of pre-aggreate table as parent table 
will
+    // also get updated
+    if(timeSeriesFunction != null) {
+      TimeSeriesUtil.validateTimeSeriesEventTime(dmProperties.toMap, 
parentTable)
+      TimeSeriesUtil.validateEventTimeColumnExitsInSelect(
+        fieldRelationMap,
+        dmProperties(TimeSeriesUtil.TIMESERIES_EVENTTIME))
+      TimeSeriesUtil.updateTimeColumnSelect(fieldRelationMap,
+        dmProperties(TimeSeriesUtil.TIMESERIES_EVENTTIME),
+      timeSeriesFunction)
+    }
+    tableModel.parentTable = Some(parentTable)
+    tableModel.dataMapRelation = Some(fieldRelationMap)
+    val tablePath = if (dmProperties.contains("path")) {
+      dmProperties("path")
+    } else {
+      CarbonEnv.getTablePath(tableModel.databaseNameOp, 
tableModel.tableName)(sparkSession)
+    }
+    CarbonCreateTableCommand(TableNewProcessor(tableModel),
+      tableModel.ifNotExistsSet, Some(tablePath)).run(sparkSession)
+
+    val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
+    val tableInfo = table.getTableInfo
+
+    // child schema object will be saved on parent table schema
+    val childSchema = tableInfo.getFactTable.buildChildSchema(
+      dataMapName,
+      DataMapProvider.PREAGGREGATE.toString,
+      tableInfo.getDatabaseName,
+      queryString,
+      "AGGREGATION")
+    dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
+
+    // updating the parent table about child table
+    PreAggregateUtil.updateMainTable(parentTable, childSchema, sparkSession)
+
+    // After updating the parent carbon table with data map entry extract the 
latest table object
+    // to be used in further create process.
+    parentTable = CarbonEnv.getCarbonTable(Some(parentTable.getDatabaseName),
+      parentTable.getTableName)(sparkSession)
+
+    val updatedLoadQuery = if (timeSeriesFunction != null) {
+      val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala
+        .filter(p => p.getDataMapName.equalsIgnoreCase(dataMapName))
+        .head
+        .asInstanceOf[AggregationDataMapSchema]
+      
PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema,
+        parentTable.getTableName,
+        parentTable.getDatabaseName)
+    } else {
+      queryString
+    }
+    val dataFrame = sparkSession.sql(new 
CarbonSpark2SqlParser().addPreAggLoadFunction(
+      updatedLoadQuery)).drop("preAggLoad")
+    loadCommand = PreAggregateUtil.createLoadCommandForChild(
+      tableInfo.getFactTable.getListOfColumns,
+      tableIdentifier,
+      dataFrame,
+      isOverwrite = false,
+      sparkSession = sparkSession)
+    loadCommand.processMetadata(sparkSession)
+    Seq.empty
+  }
+
+  def initData(sparkSession: SparkSession): Seq[Row] = {
+    // load child table if parent table has existing segments
+    // This will be used to check if the parent table has any segments or not. 
If not then no
+    // need to fire load for pre-aggregate table. Therefore reading the load 
details for PARENT
+    // table.
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false)
+    val loadAvailable = 
SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
+    if (loadAvailable.exists(load => load.getSegmentStatus == 
SegmentStatus.INSERT_IN_PROGRESS ||
+      load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
+      throw new UnsupportedOperationException(
+        "Cannot create pre-aggregate table when insert is in progress on main 
table")
+    } else if (loadAvailable.nonEmpty) {
+      val updatedQuery = if (timeSeriesFunction != null) {
+        val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala
+          .filter(p => p.getDataMapName
+            .equalsIgnoreCase(dataMapName)).head
+          .asInstanceOf[AggregationDataMapSchema]
+        
PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema,
+          parentTable.getTableName,
+          parentTable.getDatabaseName)
+      } else {
+        queryString
+      }
+      // Passing segmentToLoad as * because we want to load all the segments 
into the
+      // pre-aggregate table even if the user has set some segments on the 
parent table.
+      loadCommand.dataFrame = Some(PreAggregateUtil
+        .getDataFrame(sparkSession, loadCommand.logicalPlan.get))
+      PreAggregateUtil.startDataLoadForDataMap(
+        parentTable,
+        segmentToLoad = "*",
+        validateSegments = true,
+        sparkSession,
+        loadCommand)
+    }
+    Seq.empty
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 1073f63..845e30d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command.{ColumnTableRelation, 
DataMapField, Field}
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.types.DataType
 
@@ -405,11 +405,11 @@ object PreAggregateUtil {
    * Below method will be used to update the main table about the pre 
aggregate table information
    * in case of any exception it will throw error so pre aggregate table 
creation will fail
    *
-   * @param childSchema
-   * @param sparkSession
+   * @return the existing TableInfo object before updating, it can be used to 
recover if any
+   *         operation failed later
    */
   def updateMainTable(carbonTable: CarbonTable,
-      childSchema: DataMapSchema, sparkSession: SparkSession): Unit = {
+      childSchema: DataMapSchema, sparkSession: SparkSession): TableInfo = {
     val LOGGER: LogService = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
       LockUsage.DROP_TABLE_LOCK)
@@ -422,7 +422,7 @@ object PreAggregateUtil {
       locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
       // get the latest carbon table and check for column existence
       // read the latest schema file
-      val thriftTableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTable)(sparkSession)
+      val thriftTableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTable)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
         thriftTableInfo,
@@ -435,11 +435,11 @@ object PreAggregateUtil {
         throw new MetadataProcessException("DataMap name already exist")
       }
       wrapperTableInfo.getDataMapSchemaList.add(childSchema)
-      val thriftTable = schemaConverter
-        .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
-      updateSchemaInfo(carbonTable,
-        thriftTable)(sparkSession)
+      val thriftTable = schemaConverter.fromWrapperToExternalTableInfo(
+        wrapperTableInfo, dbName, tableName)
+      updateSchemaInfo(carbonTable, thriftTable)(sparkSession)
       LOGGER.info(s"Parent table updated is successful for table 
$dbName.$tableName")
+      thriftTableInfo
     } catch {
       case e: Exception =>
         LOGGER.error(e, "Pre Aggregate Parent table update failed reverting 
changes")
@@ -449,7 +449,6 @@ object PreAggregateUtil {
       // release lock after command execution completion
       releaseLocks(locks)
     }
-    Seq.empty
   }
 
   /**
@@ -525,7 +524,7 @@ object PreAggregateUtil {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(sparkSession)
     carbonTable.getTableLastUpdatedTime
-    val thriftTable: TableInfo = 
metastore.getThriftTableInfo(carbonTable)(sparkSession)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)
     if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
       metastore.revertTableSchemaForPreAggCreationFailure(
         carbonTable.getAbsoluteTableIdentifier, thriftTable)(sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 07917d0..07d693b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -63,7 +63,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
       
OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener, 
operationContext)
       // get the latest carbon table and check for column existence
       // read the latest schema file
-      val thriftTableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTable)(sparkSession)
+      val thriftTableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTable)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter
         .fromExternalToWrapperTableInfo(thriftTableInfo,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index fa8003e..51c8ec8 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -74,7 +74,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
         sys.error(s"Invalid Column: $columnName")
       }
       // read the latest schema file
-      val tableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTable)(sparkSession)
+      val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)
       // maintain the added column for schema evolution history
       var addColumnSchema: ColumnSchema = null
       var deletedColumnSchema: ColumnSchema = null

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index d848eb5..fcc1095 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -99,7 +99,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
 
       // read the latest schema file
       val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(carbonTable)(sparkSession)
+        metastore.getThriftTableInfo(carbonTable)
       // maintain the deleted columns for schema evolution history
       var deletedColumnSchema = 
ListBuffer[org.apache.carbondata.format.ColumnSchema]()
       val columnSchemaList = tableInfo.fact_table.table_columns.asScala

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index fc780cb..cda78ce 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -108,7 +108,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
         sparkSession)
       OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, 
operationContext)
       val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(carbonTable)(sparkSession)
+        metastore.getThriftTableInfo(carbonTable)
       val schemaEvolutionEntry = new 
SchemaEvolutionEntry(System.currentTimeMillis)
       schemaEvolutionEntry.setTableName(newTableName)
       timeStamp = System.currentTimeMillis()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index 9e0cee5..e67a98f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -63,13 +63,13 @@ object TimeSeriesUtil {
    * @return whether find  only one granularity
    */
   def validateTimeSeriesGranularity(
-      dmProperties: Map[String, String],
+      dmProperties: java.util.Map[String, String],
       dmClassName: String): Boolean = {
     var isFound = false
 
     // 1. granularity only support one
     for (granularity <- Granularity.values()) {
-      if (dmProperties.get(granularity.getName).isDefined) {
+      if (dmProperties.containsKey(granularity.getName)) {
         if (isFound) {
           throw new MalformedDataMapCommandException(
             s"Only one granularity level can be defined")
@@ -104,14 +104,14 @@ object TimeSeriesUtil {
    * @return key and value tuple
    */
   def getTimeSeriesGranularityDetails(
-      dmProperties: Map[String, String],
+      dmProperties: java.util.Map[String, String],
       dmClassName: String): (String, String) = {
 
     val defaultValue = "1"
     for (granularity <- Granularity.values()) {
-      if (dmProperties.get(granularity.getName).isDefined &&
-        
dmProperties.get(granularity.getName).get.equalsIgnoreCase(defaultValue)) {
-        return (granularity.toString.toLowerCase, 
dmProperties.get(granularity.getName).get)
+      if (dmProperties.containsKey(granularity.getName) &&
+        dmProperties.get(granularity.getName).equalsIgnoreCase(defaultValue)) {
+        return (granularity.toString.toLowerCase, 
dmProperties.get(granularity.getName))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index fd09e48..b3438a4 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -523,8 +523,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
   override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] =
     metadata.carbonTables
 
-  override def getThriftTableInfo(carbonTable: CarbonTable)
-    (sparkSession: SparkSession): TableInfo = {
+  override def getThriftTableInfo(carbonTable: CarbonTable): TableInfo = {
     val tableMetadataFile = 
CarbonTablePath.getSchemaFilePath(carbonTable.getTablePath)
     CarbonUtil.readSchemaFile(tableMetadataFile)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 44f731e..4c40fee 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -96,8 +96,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     Seq()
   }
 
-  override def getThriftTableInfo(carbonTable: CarbonTable)
-    (sparkSession: SparkSession): format.TableInfo = {
+  override def getThriftTableInfo(carbonTable: CarbonTable): format.TableInfo 
= {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo,
       carbonTable.getDatabaseName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 0645040..c2333f9 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -143,7 +143,7 @@ trait CarbonMetaStore {
 
   def listAllTables(sparkSession: SparkSession): Seq[CarbonTable]
 
-  def getThriftTableInfo(carbonTable: CarbonTable)(sparkSession: 
SparkSession): TableInfo
+  def getThriftTableInfo(carbonTable: CarbonTable): TableInfo
 
   def getTableFromMetadataCache(database: String, tableName: String): 
Option[CarbonTable]
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index aaa87a3..9c2c7e7 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -192,7 +192,7 @@ object AlterTableUtil {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val fileType = FileFactory.getFileType(tablePath)
     if (FileFactory.isFileExist(tablePath, fileType)) {
-      val tableInfo = 
metastore.getThriftTableInfo(oldCarbonTable)(sparkSession)
+      val tableInfo = metastore.getThriftTableInfo(oldCarbonTable)
       val evolutionEntryList = 
tableInfo.fact_table.schema_evolution.schema_evolution_history
       val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 
1).time_stamp
       if (updatedTime == timeStamp) {
@@ -221,7 +221,7 @@ object AlterTableUtil {
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(sparkSession)
-    val thriftTable: TableInfo = 
metastore.getThriftTableInfo(carbonTable)(sparkSession)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)
     val evolutionEntryList = 
thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 
1).time_stamp
     if (updatedTime == timeStamp) {
@@ -246,7 +246,7 @@ object AlterTableUtil {
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(sparkSession)
-    val thriftTable: TableInfo = 
metastore.getThriftTableInfo(carbonTable)(sparkSession)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)
     val evolutionEntryList = 
thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 
1).time_stamp
     if (updatedTime == timeStamp) {
@@ -277,7 +277,7 @@ object AlterTableUtil {
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(sparkSession)
-    val thriftTable: TableInfo = 
metastore.getThriftTableInfo(carbonTable)(sparkSession)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)
     val evolutionEntryList = 
thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 
1).time_stamp
     if (updatedTime == timeStamp) {
@@ -326,7 +326,7 @@ object AlterTableUtil {
       carbonTable = CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(sparkSession)
       // get the latest carbon table
       // read the latest schema file
-      val thriftTableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTable)(sparkSession)
+      val thriftTableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTable)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
         thriftTableInfo,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5a625f4c/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index e817590..f9baab3 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -30,13 +30,13 @@ import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
-import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.IndexDataMapFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.processing.store.TablePage;
 
 /**
- * It is for writing DataMap for one table
+ * It is for writing IndexDataMap for one table
  */
 public class DataMapWriterListener {
 
@@ -54,7 +54,7 @@ public class DataMapWriterListener {
     List<TableDataMap> tableDataMaps = 
DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
     if (tableDataMaps != null) {
       for (TableDataMap tableDataMap : tableDataMaps) {
-        DataMapFactory factory = tableDataMap.getDataMapFactory();
+        IndexDataMapFactory factory = tableDataMap.getIndexDataMapFactory();
         register(factory, segmentId, dataWritePath);
       }
     }
@@ -63,7 +63,7 @@ public class DataMapWriterListener {
   /**
    * Register a AbstractDataMapWriter
    */
-  private void register(DataMapFactory factory, String segmentId, String 
dataWritePath) {
+  private void register(IndexDataMapFactory factory, String segmentId, String 
dataWritePath) {
     assert (factory != null);
     assert (segmentId != null);
     DataMapMeta meta = factory.getMeta();

Reply via email to