Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2103#discussion_r179950352
  
    --- Diff: 
integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionStateWithoutHive.scala
 ---
    @@ -0,0 +1,253 @@
    +package org.apache.spark.sql.hive
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
    +import org.apache.spark.sql.catalyst.catalog._
    +import org.apache.spark.sql.catalyst.expressions.Expression
    +import org.apache.spark.sql.catalyst.optimizer.Optimizer
    +import org.apache.spark.sql.catalyst.parser.ParserInterface
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
SubqueryAlias}
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.command.{AlterTableVo, TableRenameVo}
    +import org.apache.spark.sql.execution.datasources._
    +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, 
DDLStrategy, StreamingTableStrategy}
    +import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, 
SessionState, SessionStateBuilder}
    +import org.apache.spark.sql.optimizer.{CarbonIUDRule, 
CarbonLateDecodeRule, CarbonUDFTransformRule}
    +import org.apache.spark.sql.parser.CarbonSparkSqlParser
    +import org.apache.spark.sql.types.{StructField, StructType}
    +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, 
SparkSession}
    +import org.apache.spark.util.CarbonReflectionUtils
    +
    +import org.apache.carbondata.core.util.CarbonUtil
    +import org.apache.carbondata.core.util.path.CarbonTablePath
    +import org.apache.carbondata.format.TableInfo
    +import org.apache.carbondata.spark.util.CarbonScalaUtil
    +
    +/**
    + * This class will have carbon catalog and refresh the relation from cache 
if the carbontable in
    + * carbon catalog is not same as cached carbon relation's carbon table
    + *
    + * @param externalCatalog
    + * @param globalTempViewManager
    + * @param sparkSession
    + * @param functionResourceLoader
    + * @param functionRegistry
    + * @param conf
    + * @param hadoopConf
    + */
    +class CarbonInMemorySessionCatalog(
    +    externalCatalog: ExternalCatalog,
    +    globalTempViewManager: GlobalTempViewManager,
    +    functionRegistry: FunctionRegistry,
    +    sparkSession: SparkSession,
    +    conf: SQLConf,
    +    hadoopConf: Configuration,
    +    parser: ParserInterface,
    +    functionResourceLoader: FunctionResourceLoader)
    +  extends SessionCatalog(
    +    externalCatalog,
    +    globalTempViewManager,
    +    functionRegistry,
    +    conf,
    +    hadoopConf,
    +    parser,
    +    functionResourceLoader
    +  ) with CarbonSessionCatalog {
    +
    +  override def alterTableRename(tableRenameVo: TableRenameVo): Unit = {
    +    sparkSession.sessionState.catalog.renameTable(
    +      tableRenameVo.oldTableIdentifier,
    +      tableRenameVo.newTableIdentifier)
    +  }
    +
    +  override def alterTable(alterTableVo: AlterTableVo) : Unit = {
    +    // NOt Required in case of In-memory catalog
    +  }
    +
    +  override def alterAddColumns(alterTableVo: AlterTableVo): Unit = {
    +    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(
    +      alterTableVo.tableIdentifier)
    +    val structType = catalogTable.schema
    +    var newStructType = structType
    +    alterTableVo.newColumns.get.foreach {cols =>
    +      newStructType = structType
    +        .add(cols.getColumnName, 
CarbonScalaUtil.convertCarbonToSparkDataType(cols.getDataType))
    +    }
    +    alterSchema(newStructType, catalogTable, alterTableVo.tableIdentifier)
    +  }
    +
    +  override def alterDropColumns(alterTableVo: AlterTableVo): Unit = {
    +    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(
    +      alterTableVo.tableIdentifier)
    +    val fields = catalogTable.schema.fields.filterNot { field =>
    +      alterTableVo.newColumns.get.exists { col =>
    +        col.getColumnName.equalsIgnoreCase(field.name)
    +      }
    +    }
    +    alterSchema(new StructType(fields), catalogTable, 
alterTableVo.tableIdentifier)
    +  }
    +
    +  override def alterColumnChangeDataType(alterTableVo: AlterTableVo): Unit 
= {
    +    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(
    +      alterTableVo.tableIdentifier)
    +    val a = catalogTable.schema.fields.flatMap { field =>
    +      alterTableVo.newColumns.get.map { col =>
    +        if (col.getColumnName.equalsIgnoreCase(field.name)) {
    +          StructField(col.getColumnName,
    +            CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType))
    +        } else {
    +          field
    +        }
    +      }
    +    }
    +    alterSchema(new StructType(a), catalogTable, 
alterTableVo.tableIdentifier)
    +  }
    +
    +  private def alterSchema(structType: StructType,
    +      catalogTable: CatalogTable,
    +      tableIdentifier: TableIdentifier): Unit = {
    +    val copy = catalogTable.copy(schema = structType)
    +    sparkSession.sessionState.catalog.alterTable(copy)
    +    sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
    +  }
    +
    +  lazy val carbonEnv = {
    +    val env = new CarbonEnv
    +    env.init(sparkSession)
    +    env
    +  }
    +
    +  def getCarbonEnv() : CarbonEnv = {
    +    carbonEnv
    +  }
    +
    +  // Initialize all listeners to the Operation bus.
    +  CarbonEnv.initListeners()
    +
    +  def getThriftTableInfo(tablePath: String): TableInfo = {
    +    val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
    +    CarbonUtil.readSchemaFile(tableMetadataFile)
    +  }
    +
    +  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
    +    val rtnRelation = super.lookupRelation(name)
    +    val isRelationRefreshed =
    +      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
    +    if (isRelationRefreshed) {
    +      super.lookupRelation(name)
    +    } else {
    +      rtnRelation
    +    }
    +  }
    +
    +  /**
    +   * returns hive client from HiveExternalCatalog
    +   *
    +   * @return
    +   */
    +  def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
    +    null
    +  }
    +
    +  override def createPartitions(
    +      tableName: TableIdentifier,
    +      parts: Seq[CatalogTablePartition],
    +      ignoreIfExists: Boolean): Unit = {
    +    try {
    +      val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
    +      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
    +      super.createPartitions(tableName, updatedParts, ignoreIfExists)
    +    } catch {
    +      case e: Exception =>
    +        super.createPartitions(tableName, parts, ignoreIfExists)
    +    }
    +  }
    +
    +  /**
    +   * This is alternate way of getting partition information. It first 
fetches all partitions from
    +   * hive and then apply filter instead of querying hive along with 
filters.
    +   * @param partitionFilters
    +   * @param sparkSession
    +   * @param identifier
    +   * @return
    +   */
    +  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
    +      sparkSession: SparkSession,
    +      identifier: TableIdentifier) = {
    +    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, 
sparkSession, identifier)
    +  }
    +
    +  /**
    +   * Update the storageformat with new location information
    +   */
    +  override def updateStorageLocation(
    +      path: Path,
    +      storage: CatalogStorageFormat,
    +      newTableName: String,
    +      dbName: String): CatalogStorageFormat = {
    +    storage.copy(locationUri = Some(path.toUri))
    +  }
    +}
    +
    +class CarbonSessionStateBuilderWithoutHive (sparkSession: SparkSession,
    --- End diff --
    
    change to `CarbonInMemorySessionStateBuilder`


---

Reply via email to