Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r153064235 --- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.expressions.ScalarSubquery +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateHiveTableContext, CreateTableContext} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _} +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy} +import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.{SQLConf, SessionState} +import org.apache.spark.sql.optimizer.CarbonLateDecodeRule +import org.apache.spark.sql.parser.{CarbonHelperqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser} + +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier + +/** + * This class will have carbon catalog and refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table + * + * @param externalCatalog + * @param globalTempViewManager + * @param sparkSession + * @param functionResourceLoader + * @param functionRegistry + * @param conf + * @param hadoopConf + */ +class CarbonSessionCatalog( + externalCatalog: HiveExternalCatalog, + globalTempViewManager: GlobalTempViewManager, + functionRegistry: FunctionRegistry, + sparkSession: SparkSession, + conf: SQLConf, + hadoopConf: Configuration, + parser: ParserInterface, + functionResourceLoader: FunctionResourceLoader) + extends HiveSessionCatalog( + externalCatalog, + globalTempViewManager, + new HiveMetastoreCatalog(sparkSession), + functionRegistry, + conf, + hadoopConf, + parser, + functionResourceLoader + ) { + + lazy val carbonEnv = { + val env = new CarbonEnv + env.init(sparkSession) + env + } + + def getCarbonEnv() : CarbonEnv = { + carbonEnv + } + + + private def refreshRelationFromCache(identifier: TableIdentifier, + carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = { + var isRefreshed = false + val storePath = CarbonEnv.getInstance(sparkSession).storePath + carbonEnv.carbonMetastore. + checkSchemasModifiedTimeAndReloadTables(storePath) + + val tableMeta = carbonEnv.carbonMetastore + .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName, + carbonDatasourceHadoopRelation.carbonTable.getFactTableName) + if (tableMeta.isEmpty || (tableMeta.isDefined && + tableMeta.get.carbonTable.getTableLastUpdatedTime != + carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) { + refreshTable(identifier) + DataMapStoreManager.getInstance(). + clearDataMap(AbsoluteTableIdentifier.from(storePath, + identifier.database.getOrElse("default"), identifier.table)) + isRefreshed = true + logInfo(s"Schema changes have been detected for table: $identifier") + } + isRefreshed + } + + + override def lookupRelation(name: TableIdentifier): LogicalPlan = { + val rtnRelation = super.lookupRelation(name) + var toRefreshRelation = false + rtnRelation match { + case SubqueryAlias(_, + LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) => + toRefreshRelation = refreshRelationFromCache(name, carbonDatasourceHadoopRelation) + case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => + toRefreshRelation = refreshRelationFromCache(name, carbonDatasourceHadoopRelation) + case _ => + } + + if (toRefreshRelation) { + super.lookupRelation(name) + } else { + rtnRelation + } + } +} + +/** + * Session state implementation to override sql parser and adding strategies + * + * @param sparkSession + */ +class CarbonSessionStateBuilder(sparkSession: SparkSession, + parentState: Option[SessionState] = None) + extends HiveSessionStateBuilder(sparkSession, parentState) { + + override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) + + experimentalMethods.extraStrategies = + Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession)) + experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule) + + /** + * Internal catalog for managing table and database states. + */ + /** + * Create a [[CarbonSessionCatalogBuild]]. + */ + override protected lazy val catalog: CarbonSessionCatalog = { + val catalog = new CarbonSessionCatalog( + externalCatalog, + session.sharedState.globalTempViewManager, + functionRegistry, + sparkSession, + conf, + SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), + sqlParser, + resourceLoader) + parentState.foreach(_.catalog.copyStateTo(catalog)) + catalog + } + + private def externalCatalog: HiveExternalCatalog = + session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] + + /** + * Create a Hive aware resource loader. + */ + override protected lazy val resourceLoader: HiveSessionResourceLoader = { + val client: HiveClient = externalCatalog.client.newSession() + new HiveSessionResourceLoader(session, client) + } + + override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods) + + override protected def analyzer: Analyzer = { + new Analyzer(catalog, conf) { + + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = + new ResolveHiveSerdeTable(session) +: + new FindDataSourceTable(session) +: + new ResolveSQLOnFile(session) +: + new CarbonIUDAnalysisRule(sparkSession) +: + CarbonPreInsertionCasts +: customResolutionRules + + override val extendedCheckRules: Seq[LogicalPlan => Unit] = + PreWriteCheck :: HiveOnlyCheck :: Nil + + override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = + new DetermineTableStats(session) +: + RelationConversions(conf, catalog) +: + PreprocessTableCreation(session) +: + PreprocessTableInsertion(conf) +: + DataSourceAnalysis(conf) +: + HiveAnalysis +: + customPostHocResolutionRules + } + } + + override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _) + +} + + +class CarbonOptimizer( + catalog: SessionCatalog, + conf: SQLConf, + experimentalMethods: ExperimentalMethods) + extends SparkOptimizer(catalog, conf, experimentalMethods) { + + override def execute(plan: LogicalPlan): LogicalPlan = { + // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And + // optimize whole plan at once. + val transFormedPlan = plan.transform { + case filter: Filter => + filter.transformExpressions { + case s: ScalarSubquery => + val tPlan = s.plan.transform { + case lr: LogicalRelation + if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true + lr + } + ScalarSubquery(tPlan, s.children, s.exprId) + } + } + super.execute(transFormedPlan) + } +} + +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends + SparkSqlAstBuilder(conf) { + + val helper = new CarbonHelperqlAstBuilder(conf, parser) + + override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = { + val fileStorage = helper.getFileStorage(ctx.createFileFormat) + + if (fileStorage.equalsIgnoreCase("'carbondata'") || + fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { + helper + .createCarbontable(ctx.createTableHeader, --- End diff -- move line up
---