Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2103#discussion_r179950352
--- Diff:
integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionStateWithoutHive.scala
---
@@ -0,0 +1,253 @@
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.{AlterTableVo, TableRenameVo}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy,
DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader,
SessionState, SessionStateBuilder}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule,
CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv,
SparkSession}
+import org.apache.spark.util.CarbonReflectionUtils
+
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache
if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonInMemorySessionCatalog(
+ externalCatalog: ExternalCatalog,
+ globalTempViewManager: GlobalTempViewManager,
+ functionRegistry: FunctionRegistry,
+ sparkSession: SparkSession,
+ conf: SQLConf,
+ hadoopConf: Configuration,
+ parser: ParserInterface,
+ functionResourceLoader: FunctionResourceLoader)
+ extends SessionCatalog(
+ externalCatalog,
+ globalTempViewManager,
+ functionRegistry,
+ conf,
+ hadoopConf,
+ parser,
+ functionResourceLoader
+ ) with CarbonSessionCatalog {
+
+ override def alterTableRename(tableRenameVo: TableRenameVo): Unit = {
+ sparkSession.sessionState.catalog.renameTable(
+ tableRenameVo.oldTableIdentifier,
+ tableRenameVo.newTableIdentifier)
+ }
+
+ override def alterTable(alterTableVo: AlterTableVo) : Unit = {
+ // NOt Required in case of In-memory catalog
+ }
+
+ override def alterAddColumns(alterTableVo: AlterTableVo): Unit = {
+ val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(
+ alterTableVo.tableIdentifier)
+ val structType = catalogTable.schema
+ var newStructType = structType
+ alterTableVo.newColumns.get.foreach {cols =>
+ newStructType = structType
+ .add(cols.getColumnName,
CarbonScalaUtil.convertCarbonToSparkDataType(cols.getDataType))
+ }
+ alterSchema(newStructType, catalogTable, alterTableVo.tableIdentifier)
+ }
+
+ override def alterDropColumns(alterTableVo: AlterTableVo): Unit = {
+ val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(
+ alterTableVo.tableIdentifier)
+ val fields = catalogTable.schema.fields.filterNot { field =>
+ alterTableVo.newColumns.get.exists { col =>
+ col.getColumnName.equalsIgnoreCase(field.name)
+ }
+ }
+ alterSchema(new StructType(fields), catalogTable,
alterTableVo.tableIdentifier)
+ }
+
+ override def alterColumnChangeDataType(alterTableVo: AlterTableVo): Unit
= {
+ val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(
+ alterTableVo.tableIdentifier)
+ val a = catalogTable.schema.fields.flatMap { field =>
+ alterTableVo.newColumns.get.map { col =>
+ if (col.getColumnName.equalsIgnoreCase(field.name)) {
+ StructField(col.getColumnName,
+ CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType))
+ } else {
+ field
+ }
+ }
+ }
+ alterSchema(new StructType(a), catalogTable,
alterTableVo.tableIdentifier)
+ }
+
+ private def alterSchema(structType: StructType,
+ catalogTable: CatalogTable,
+ tableIdentifier: TableIdentifier): Unit = {
+ val copy = catalogTable.copy(schema = structType)
+ sparkSession.sessionState.catalog.alterTable(copy)
+ sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
+ }
+
+ lazy val carbonEnv = {
+ val env = new CarbonEnv
+ env.init(sparkSession)
+ env
+ }
+
+ def getCarbonEnv() : CarbonEnv = {
+ carbonEnv
+ }
+
+ // Initialize all listeners to the Operation bus.
+ CarbonEnv.initListeners()
+
+ def getThriftTableInfo(tablePath: String): TableInfo = {
+ val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
+ CarbonUtil.readSchemaFile(tableMetadataFile)
+ }
+
+ override def lookupRelation(name: TableIdentifier): LogicalPlan = {
+ val rtnRelation = super.lookupRelation(name)
+ val isRelationRefreshed =
+ CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
+ if (isRelationRefreshed) {
+ super.lookupRelation(name)
+ } else {
+ rtnRelation
+ }
+ }
+
+ /**
+ * returns hive client from HiveExternalCatalog
+ *
+ * @return
+ */
+ def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+ null
+ }
+
+ override def createPartitions(
+ tableName: TableIdentifier,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit = {
+ try {
+ val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+ val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
+ super.createPartitions(tableName, updatedParts, ignoreIfExists)
+ } catch {
+ case e: Exception =>
+ super.createPartitions(tableName, parts, ignoreIfExists)
+ }
+ }
+
+ /**
+ * This is alternate way of getting partition information. It first
fetches all partitions from
+ * hive and then apply filter instead of querying hive along with
filters.
+ * @param partitionFilters
+ * @param sparkSession
+ * @param identifier
+ * @return
+ */
+ override def getPartitionsAlternate(partitionFilters: Seq[Expression],
+ sparkSession: SparkSession,
+ identifier: TableIdentifier) = {
+ CarbonSessionUtil.prunePartitionsByFilter(partitionFilters,
sparkSession, identifier)
+ }
+
+ /**
+ * Update the storageformat with new location information
+ */
+ override def updateStorageLocation(
+ path: Path,
+ storage: CatalogStorageFormat,
+ newTableName: String,
+ dbName: String): CatalogStorageFormat = {
+ storage.copy(locationUri = Some(path.toUri))
+ }
+}
+
+class CarbonSessionStateBuilderWithoutHive (sparkSession: SparkSession,
--- End diff --
change to `CarbonInMemorySessionStateBuilder`
---