Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1469#discussion_r149264766
--- Diff:
integration/spark2.2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction,
FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.{Cast, Expression,
ScalarSubquery}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan,
SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkOptimizer
+import org.apache.spark.sql.execution.datasources.{FindDataSourceTable,
LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy,
DDLStrategy}
+import org.apache.spark.sql.internal.{SQLConf, SessionState}
+import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache
if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonSessionCatalog(
+ externalCatalog: HiveExternalCatalog,
+ globalTempViewManager: GlobalTempViewManager,
+ functionRegistry: FunctionRegistry,
+ sparkSession: SparkSession,
+ conf: SQLConf,
+ hadoopConf: Configuration,
+ parser: ParserInterface,
+ functionResourceLoader: FunctionResourceLoader)
+ extends HiveSessionCatalog(
+ externalCatalog,
+ globalTempViewManager,
+ new HiveMetastoreCatalog(sparkSession),
+ functionRegistry,
+ conf,
+ hadoopConf,
+ parser,
+ functionResourceLoader
+ ) {
+
+ lazy val carbonEnv = {
+ val env = new CarbonEnv
+ env.init(sparkSession)
+ env
+ }
+
+
+ private def refreshRelationFromCache(identifier: TableIdentifier,
+ alias: Option[String],
+ carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation):
Boolean = {
+ var isRefreshed = false
+ val storePath = CarbonEnv.getInstance(sparkSession).storePath
+ carbonEnv.carbonMetastore.
+ checkSchemasModifiedTimeAndReloadTables(storePath)
+
+ val tableMeta = carbonEnv.carbonMetastore
+
.getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
+ carbonDatasourceHadoopRelation.carbonTable.getFactTableName)
+ if (tableMeta.isEmpty || (tableMeta.isDefined &&
+
tableMeta.get.carbonTable.getTableLastUpdatedTime !=
+
carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
+ refreshTable(identifier)
+ DataMapStoreManager.getInstance().
+ clearDataMap(AbsoluteTableIdentifier.from(storePath,
+ identifier.database.getOrElse("default"), identifier.table))
+ isRefreshed = true
+ logInfo(s"Schema changes have been detected for table: $identifier")
+ }
+ isRefreshed
+ }
+
+ // override def makeFunctionBuilder(funcName: String, className:
String): FunctionBuilder = {
+ // makeFunctionBuilder(funcName, Utils.classForName(className))
+ // }
+ //
+ // /**
+ // * Construct a [[FunctionBuilder]] based on the provided class that
represents a function.
+ // */
+ // private def makeFunctionBuilder(name: String, clazz: Class[_]):
FunctionBuilder = {
+ // // When we instantiate hive UDF wrapper class, we may throw
exception if the input
+ // // expressions don't satisfy the hive UDF, such as type mismatch,
input number
+ // // mismatch, etc. Here we catch the exception and throw
AnalysisException instead.
+ // (children: Seq[Expression]) => {
+ // try {
+ // if (classOf[UDF].isAssignableFrom(clazz)) {
+ // val udf = HiveSimpleUDF(name, new
HiveFunctionWrapper(clazz.getName), children)
+ // udf.dataType // Force it to check input data types.
+ // udf
+ // } else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
+ // val udf = HiveGenericUDF(name, new
HiveFunctionWrapper(clazz.getName), children)
+ // udf.dataType // Force it to check input data types.
+ // udf
+ // } else if
(classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) {
+ // val udaf = HiveUDAFFunction(name, new
HiveFunctionWrapper(clazz.getName), children)
+ // udaf.dataType // Force it to check input data types.
+ // udaf
+ // } else if (classOf[UDAF].isAssignableFrom(clazz)) {
+ // val udaf = HiveUDAFFunction(
+ // name,
+ // new HiveFunctionWrapper(clazz.getName),
+ // children,
+ // isUDAFBridgeRequired = true)
+ // udaf.dataType // Force it to check input data types.
+ // udaf
+ // } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
+ // val udtf = HiveGenericUDTF(name, new
HiveFunctionWrapper(clazz.getName), children)
+ // udtf.elementSchema // Force it to check input data types.
+ // udtf
+ // } else {
+ // throw new AnalysisException(s"No handler for Hive UDF
'${clazz.getCanonicalName}'")
+ // }
+ // } catch {
+ // case ae: AnalysisException =>
+ // throw ae
+ // case NonFatal(e) =>
+ // val analysisException =
+ // new AnalysisException(s"No handler for Hive UDF
'${clazz.getCanonicalName}': $e")
+ // analysisException.setStackTrace(e.getStackTrace)
+ // throw analysisException
+ // }
+ // }
+ // }
+ //
+ // override def lookupFunction(name: FunctionIdentifier, children:
Seq[Expression]): Expression = {
+ // try {
+ // lookupFunction0(name, children)
+ // } catch {
+ // case NonFatal(_) =>
+ // // SPARK-16228 ExternalCatalog may recognize `double`-type
only.
+ // val newChildren = children.map { child =>
+ // if (child.dataType.isInstanceOf[DecimalType]) Cast(child,
DoubleType) else child
+ // }
+ // lookupFunction0(name, newChildren)
+ // }
+ // }
+ //
+ // private def lookupFunction0(name: FunctionIdentifier, children:
Seq[Expression]): Expression = {
+ // val database = name.database.map(formatDatabaseName)
+ // val funcName = name.copy(database = database)
+ // Try(super.lookupFunction(funcName, children)) match {
+ // case Success(expr) => expr
+ // case Failure(error) =>
+ // if (functionRegistry.functionExists(funcName.unquotedString)) {
+ // // If the function actually exists in functionRegistry, it
means that there is an
+ // // error when we create the Expression using the given
children.
+ // // We need to throw the original exception.
+ // throw error
+ // } else {
+ // // This function is not in functionRegistry, let's try to
load it as a Hive's
+ // // built-in function.
+ // // Hive is case insensitive.
+ // val functionName =
funcName.unquotedString.toLowerCase(Locale.ROOT)
+ // if (!hiveFunctions.contains(functionName)) {
+ // failFunctionLookup(funcName)
+ // }
+ //
+ // // TODO: Remove this fallback path once we implement the
list of fallback functions
+ // // defined below in hiveFunctions.
+ // val functionInfo = {
+ // try {
+ //
Option(HiveFunctionRegistry.getFunctionInfo(functionName)).getOrElse(
+ // failFunctionLookup(funcName))
+ // } catch {
+ // // If HiveFunctionRegistry.getFunctionInfo throws an
exception,
+ // // we are failing to load a Hive builtin function, which
means that
+ // // the given function is not a Hive builtin function.
+ // case NonFatal(e) => failFunctionLookup(funcName)
+ // }
+ // }
+ // val className = functionInfo.getFunctionClass.getName
+ // val functionIdentifier =
+ // FunctionIdentifier(functionName.toLowerCase(Locale.ROOT),
database)
+ // val func = CatalogFunction(functionIdentifier, className,
Nil)
+ // // Put this Hive built-in function to our function registry.
+ // registerFunction(func, ignoreIfExists = false)
+ // // Now, we need to create the Expression.
+ // functionRegistry.lookupFunction(functionName, children)
+ // }
+ // }
+ // }
+ //
+ // // TODO Removes this method after implementing Spark native
"histogram_numeric".
+ // override def functionExists(name: FunctionIdentifier): Boolean = {
+ // super.functionExists(name) || hiveFunctions.contains(name.funcName)
+ // }
+ //
+ // /** List of functions we pass over to Hive. Note that over time this
list should go to 0. */
+ // // We have a list of Hive built-in functions that we do not support.
So, we will check
+ // // Hive's function registry and lazily load needed functions into
our own function registry.
+ // // List of functions we are explicitly not supporting are:
+ // // compute_stats, context_ngrams, create_union,
+ // // current_user, ewah_bitmap, ewah_bitmap_and, ewah_bitmap_empty,
ewah_bitmap_or, field,
+ // // in_file, index, matchpath, ngrams, noop, noopstreaming,
noopwithmap,
+ // // noopwithmapstreaming, parse_url_tuple, reflect2,
windowingtablefunction.
+ // // Note: don't forget to update SessionCatalog.isTemporaryFunction
+ // private val hiveFunctions = Seq(
+ // "histogram_numeric"
+ // )
+}
+
+/**
+ * Session state implementation to override sql parser and adding
strategies
+ *
+ * @param sparkSession
+ */
+class CarbonSessionStateBuilder(sparkSession: SparkSession, parentState:
Option[SessionState] = None)
+ extends HiveSessionStateBuilder(sparkSession, parentState) {
+
+ override lazy val sqlParser: ParserInterface = new
CarbonSparkSqlParser(conf, sparkSession)
+
+ experimentalMethods.extraStrategies =
+ Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
+ experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+
+ /**
+ * Internal catalog for managing table and database states.
+ */
+ override lazy val catalog : CarbonSessionCatalog= {
--- End diff --
white space before '='
---