Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r153064235
  
    --- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.hive
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
    +import org.apache.spark.sql.catalyst.catalog._
    +import org.apache.spark.sql.catalyst.expressions.ScalarSubquery
    +import org.apache.spark.sql.catalyst.optimizer.Optimizer
    +import org.apache.spark.sql.catalyst.parser.ParserInterface
    +import 
org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateHiveTableContext, 
CreateTableContext}
    +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
SubqueryAlias}
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, 
LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
    +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, 
DDLStrategy}
    +import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
    +import org.apache.spark.sql.hive.client.HiveClient
    +import org.apache.spark.sql.internal.{SQLConf, SessionState}
    +import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
    +import org.apache.spark.sql.parser.{CarbonHelperqlAstBuilder, 
CarbonSpark2SqlParser, CarbonSparkSqlParser}
    +
    +import org.apache.carbondata.core.datamap.DataMapStoreManager
    +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
    +
    +/**
    + * This class will have carbon catalog and refresh the relation from cache 
if the carbontable in
    + * carbon catalog is not same as cached carbon relation's carbon table
    + *
    + * @param externalCatalog
    + * @param globalTempViewManager
    + * @param sparkSession
    + * @param functionResourceLoader
    + * @param functionRegistry
    + * @param conf
    + * @param hadoopConf
    + */
    +class CarbonSessionCatalog(
    +    externalCatalog: HiveExternalCatalog,
    +    globalTempViewManager: GlobalTempViewManager,
    +    functionRegistry: FunctionRegistry,
    +    sparkSession: SparkSession,
    +    conf: SQLConf,
    +    hadoopConf: Configuration,
    +    parser: ParserInterface,
    +    functionResourceLoader: FunctionResourceLoader)
    +  extends HiveSessionCatalog(
    +    externalCatalog,
    +    globalTempViewManager,
    +    new HiveMetastoreCatalog(sparkSession),
    +    functionRegistry,
    +    conf,
    +    hadoopConf,
    +    parser,
    +    functionResourceLoader
    +  ) {
    +
    +  lazy val carbonEnv = {
    +    val env = new CarbonEnv
    +    env.init(sparkSession)
    +    env
    +  }
    +
    +  def getCarbonEnv() : CarbonEnv = {
    +    carbonEnv
    +  }
    +
    +
    +  private def refreshRelationFromCache(identifier: TableIdentifier,
    +      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): 
Boolean = {
    +    var isRefreshed = false
    +    val storePath = CarbonEnv.getInstance(sparkSession).storePath
    +    carbonEnv.carbonMetastore.
    +      checkSchemasModifiedTimeAndReloadTables(storePath)
    +
    +    val tableMeta = carbonEnv.carbonMetastore
    +      
.getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
    +        carbonDatasourceHadoopRelation.carbonTable.getFactTableName)
    +    if (tableMeta.isEmpty || (tableMeta.isDefined &&
    +                              
tableMeta.get.carbonTable.getTableLastUpdatedTime !=
    +                              
carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
    +      refreshTable(identifier)
    +      DataMapStoreManager.getInstance().
    +        clearDataMap(AbsoluteTableIdentifier.from(storePath,
    +          identifier.database.getOrElse("default"), identifier.table))
    +      isRefreshed = true
    +      logInfo(s"Schema changes have been detected for table: $identifier")
    +    }
    +    isRefreshed
    +  }
    +
    +
    +  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
    +    val rtnRelation = super.lookupRelation(name)
    +    var toRefreshRelation = false
    +    rtnRelation match {
    +      case SubqueryAlias(_,
    +      LogicalRelation(carbonDatasourceHadoopRelation: 
CarbonDatasourceHadoopRelation, _, _)) =>
    +        toRefreshRelation = refreshRelationFromCache(name, 
carbonDatasourceHadoopRelation)
    +      case LogicalRelation(carbonDatasourceHadoopRelation: 
CarbonDatasourceHadoopRelation, _, _) =>
    +        toRefreshRelation = refreshRelationFromCache(name, 
carbonDatasourceHadoopRelation)
    +      case _ =>
    +    }
    +
    +    if (toRefreshRelation) {
    +      super.lookupRelation(name)
    +    } else {
    +      rtnRelation
    +    }
    +  }
    +}
    +
    +/**
    + * Session state implementation to override sql parser and adding 
strategies
    + *
    + * @param sparkSession
    + */
    +class CarbonSessionStateBuilder(sparkSession: SparkSession,
    +    parentState: Option[SessionState] = None)
    +  extends HiveSessionStateBuilder(sparkSession, parentState) {
    +
    +  override lazy val sqlParser: ParserInterface = new 
CarbonSparkSqlParser(conf, sparkSession)
    +
    +  experimentalMethods.extraStrategies =
    +    Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
    +  experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
    +
    +  /**
    +   * Internal catalog for managing table and database states.
    +   */
    +  /**
    +   * Create a [[CarbonSessionCatalogBuild]].
    +   */
    +  override protected lazy val catalog: CarbonSessionCatalog = {
    +    val catalog = new CarbonSessionCatalog(
    +      externalCatalog,
    +      session.sharedState.globalTempViewManager,
    +      functionRegistry,
    +      sparkSession,
    +      conf,
    +      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, 
conf),
    +      sqlParser,
    +      resourceLoader)
    +    parentState.foreach(_.catalog.copyStateTo(catalog))
    +    catalog
    +  }
    +
    +  private def externalCatalog: HiveExternalCatalog =
    +    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
    +
    +  /**
    +   * Create a Hive aware resource loader.
    +   */
    +  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
    +    val client: HiveClient = externalCatalog.client.newSession()
    +    new HiveSessionResourceLoader(session, client)
    +  }
    +
    +  override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, 
conf, experimentalMethods)
    +
    +  override protected def analyzer: Analyzer = {
    +    new Analyzer(catalog, conf) {
    +
    +      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
    +        new ResolveHiveSerdeTable(session) +:
    +        new FindDataSourceTable(session) +:
    +        new ResolveSQLOnFile(session) +:
    +        new CarbonIUDAnalysisRule(sparkSession) +:
    +        CarbonPreInsertionCasts +: customResolutionRules
    +
    +      override val extendedCheckRules: Seq[LogicalPlan => Unit] =
    +      PreWriteCheck :: HiveOnlyCheck :: Nil
    +
    +      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
    +        new DetermineTableStats(session) +:
    +        RelationConversions(conf, catalog) +:
    +        PreprocessTableCreation(session) +:
    +        PreprocessTableInsertion(conf) +:
    +        DataSourceAnalysis(conf) +:
    +        HiveAnalysis +:
    +        customPostHocResolutionRules
    +    }
    +  }
    +
    +  override protected def newBuilder: NewBuilder = new 
CarbonSessionStateBuilder(_, _)
    +
    +}
    +
    +
    +class CarbonOptimizer(
    +    catalog: SessionCatalog,
    +    conf: SQLConf,
    +    experimentalMethods: ExperimentalMethods)
    +  extends SparkOptimizer(catalog, conf, experimentalMethods) {
    +
    +  override def execute(plan: LogicalPlan): LogicalPlan = {
    +    // In case scalar subquery add flag in relation to skip the decoder 
plan in optimizer rule, And
    +    // optimize whole plan at once.
    +    val transFormedPlan = plan.transform {
    +      case filter: Filter =>
    +        filter.transformExpressions {
    +          case s: ScalarSubquery =>
    +            val tPlan = s.plan.transform {
    +              case lr: LogicalRelation
    +                if 
lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +                
lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
    +                lr
    +            }
    +            ScalarSubquery(tPlan, s.children, s.exprId)
    +        }
    +    }
    +    super.execute(transFormedPlan)
    +  }
    +}
    +
    +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) 
extends
    +  SparkSqlAstBuilder(conf) {
    +
    +  val helper = new CarbonHelperqlAstBuilder(conf, parser)
    +
    +  override def visitCreateHiveTable(ctx: CreateHiveTableContext): 
LogicalPlan = {
    +    val fileStorage = helper.getFileStorage(ctx.createFileFormat)
    +
    +    if (fileStorage.equalsIgnoreCase("'carbondata'") ||
    +        fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
    +      helper
    +        .createCarbontable(ctx.createTableHeader,
    --- End diff --
    
    move line up


---

Reply via email to