This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 819bf4e  [CARBONDATA-3668] CarbonSession should use old flow 
(non-CarbonExtensions flow)
819bf4e is described below

commit 819bf4e075bff24f4d64eab8ce0a35899637925b
Author: QiangCai <qiang...@qq.com>
AuthorDate: Mon Feb 3 17:38:30 2020 +0800

    [CARBONDATA-3668] CarbonSession should use old flow (non-CarbonExtensions 
flow)
    
    Why is this PR needed?
    Considering back-compatibility, CarbonSession should use old flow (not 
CarbonExtensions flow)
    
    What changes were proposed in this PR?
    1. Remove CarbonExtension from CarbonSession
    2. Recover CarbonSessionCatalog
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3586
---
 .../carbondata/examples/CarbonSessionExample.scala |  13 +-
 .../org/apache/carbondata/examples/S3Example.scala |   1 -
 .../carbondata/examples/S3UsingSDkExample.scala    |   1 -
 .../spark/thriftserver/CarbonThriftServer.scala    |  77 ++++----
 .../carbondata/spark/util/CarbonSparkUtil.scala    |  11 ++
 .../org/apache/spark/sql/CarbonExtensions.scala    |   5 +-
 .../scala/org/apache/spark/sql/CarbonSession.scala |  23 ---
 .../spark/sql/hive/CarbonSessionCatalog.scala      | 102 ++++++++++
 .../spark/sql/hive/CarbonSessionCatalogUtil.scala  | 205 ++++++++++++++++++++-
 9 files changed, 361 insertions(+), 77 deletions(-)

diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index d5c1188..e3411aa 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -40,11 +40,18 @@ object CarbonSessionExample {
       .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false")
     val spark = ExampleUtils.createCarbonSession("CarbonSessionExample")
     spark.sparkContext.setLogLevel("error")
-    exampleBody(spark)
+    Seq(
+      "stored as carbondata",
+      "using carbondata",
+      "stored by 'carbondata'",
+      "stored by 'org.apache.carbondata.format'"
+    ).foreach { formatSyntax =>
+      exampleBody(spark, formatSyntax)
+    }
     spark.close()
   }
 
-  def exampleBody(spark : SparkSession): Unit = {
+  def exampleBody(spark : SparkSession, formatSyntax: String = "stored as 
carbondata"): Unit = {
 
     val rootPath = new File(this.getClass.getResource("/").getPath
                             + "../../../..").getCanonicalPath
@@ -66,7 +73,7 @@ object CarbonSessionExample {
          | charField CHAR(5),
          | floatField FLOAT
          | )
-         | STORED AS carbondata
+         | $formatSyntax
        """.stripMargin)
 
     val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
index 8774236..1ae1dec 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
@@ -41,7 +41,6 @@ object S3Example {
     val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv"
     val logger: Logger = LoggerFactory.getLogger(this.getClass)
 
-    import org.apache.spark.sql.CarbonUtils._
     if (args.length < 3 || args.length > 5) {
       logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" +
         "<table-path-on-s3> [s3-endpoint] [spark-master]")
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
index d3b67bc..5470ae2 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
@@ -81,7 +81,6 @@ object S3UsingSdkExample {
   def main(args: Array[String]) {
     val logger: Logger = LoggerFactory.getLogger(this.getClass)
 
-    import org.apache.spark.sql.CarbonUtils._
     if (args.length < 2 || args.length > 6) {
       logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" +
         "[table-path-on-s3] [s3-endpoint] [number-of-rows] [spark-master]")
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
index ece9319..fc70757 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -28,58 +28,37 @@ import 
org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.util.CarbonSparkUtil
 
- /**
-  * CarbonThriftServer support different modes:
-  * 1. read/write data from/to HDFS or local,it only needs configurate 
storePath
-  * 2. read/write data from/to S3, it needs provide access-key, secret-key, 
s3-endpoint
-  */
+/**
+ * CarbonThriftServer support different modes:
+ * 1. read/write data from/to HDFS or local, no parameter is required in this 
case
+ * 2. read/write data from/to S3, it needs provide access-key, secret-key, 
s3-endpoint
+ */
 object CarbonThriftServer {
 
   def main(args: Array[String]): Unit = {
-
-    import org.apache.spark.sql.CarbonUtils._
-
-    val sparkConf = new SparkConf(loadDefaults = true)
-
-    val logger: Logger = LoggerFactory.getLogger(this.getClass)
-    if (args.length != 0 && args.length != 1 && args.length != 4) {
-      logger.error("parameters: storePath [access-key] [secret-key] 
[s3-endpoint]")
+    if (args.length != 0 && args.length != 3) {
+      val logger: Logger = LoggerFactory.getLogger(this.getClass)
+      logger.error("parameters: [access-key] [secret-key] [s3-endpoint]")
       System.exit(0)
     }
-
+    val sparkConf = new SparkConf(loadDefaults = true)
     val builder = SparkSession
       .builder()
       .config(sparkConf)
-      .appName("Carbon Thrift Server(uses CarbonSession)")
+      .appName("Carbon Thrift Server(uses CarbonExtensions)")
       .enableHiveSupport()
       .config("spark.sql.extensions", "org.apache.spark.sql.CarbonExtensions")
-
-    if (!sparkConf.contains("carbon.properties.filepath")) {
-      val sparkHome = System.getenv.get("SPARK_HOME")
-      if (null != sparkHome) {
-        val file = new File(sparkHome + '/' + "conf" + '/' + 
"carbon.properties")
-        if (file.exists()) {
-          builder.config("carbon.properties.filepath", file.getCanonicalPath)
-          System.setProperty("carbon.properties.filepath", 
file.getCanonicalPath)
-        }
-      }
-    } else {
-      System.setProperty("carbon.properties.filepath", 
sparkConf.get("carbon.properties.filepath"))
-    }
-
-    val storePath = if (args.length > 0) args.head else null
-
-    val spark = if (args.length <= 1) {
-      builder.getOrCreate()
-    } else {
-      val (accessKey, secretKey, endpoint) = 
CarbonSparkUtil.getKeyOnPrefix(args(0))
-      builder.config(accessKey, args(1))
-        .config(secretKey, args(2))
-        .config(endpoint, CarbonSparkUtil.getS3EndPoint(args))
-        .getOrCreate()
+    configPropertiesFile(sparkConf, builder)
+    if (args.length == 3) {
+      builder.config(CarbonSparkUtil.getSparkConfForS3(args(0), args(1), 
args(2)))
     }
+    val spark = builder.getOrCreate()
     CarbonEnv.getInstance(spark)
+    waitingForSparkLaunch()
+    HiveThriftServer2.startWithContext(spark.sqlContext)
+  }
 
+  private def waitingForSparkLaunch(): Unit = {
     val warmUpTime = 
CarbonProperties.getInstance().getProperty("carbon.spark.warmUpTime", "5000")
     try {
       Thread.sleep(Integer.parseInt(warmUpTime))
@@ -87,11 +66,25 @@ object CarbonThriftServer {
       case e: Exception =>
         val LOG = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
         LOG.error(s"Wrong value for carbon.spark.warmUpTime $warmUpTime " +
-          "Using default Value and proceeding")
+                  "Using default Value and proceeding")
         Thread.sleep(5000)
     }
-
-    HiveThriftServer2.startWithContext(spark.sqlContext)
   }
 
+  private def configPropertiesFile(sparkConf: SparkConf, builder: 
SparkSession.Builder): Unit = {
+    sparkConf.contains("carbon.properties.filepath") match {
+      case false =>
+        val sparkHome = System.getenv.get("SPARK_HOME")
+        if (null != sparkHome) {
+          val file = new File(sparkHome + '/' + "conf" + '/' + 
"carbon.properties")
+          if (file.exists()) {
+            builder.config("carbon.properties.filepath", file.getCanonicalPath)
+            System.setProperty("carbon.properties.filepath", 
file.getCanonicalPath)
+          }
+        }
+      case true =>
+        System.setProperty(
+          "carbon.properties.filepath", 
sparkConf.get("carbon.properties.filepath"))
+    }
+  }
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index 700cf91..24cc323 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable
 import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, FloatType, 
MapType, StructField, StructType}
+import org.apache.spark.SparkConf
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
@@ -104,6 +105,16 @@ object CarbonSparkUtil {
       delimiter
   }
 
+  def getSparkConfForS3(accessKey: String, secretKey: String, endpoint: 
String): SparkConf = {
+    val sparkConf = new SparkConf(false)
+    val prefix = "spark.hadoop."
+    Seq(ACCESS_KEY, CarbonCommonConstants.S3N_ACCESS_KEY, 
CarbonCommonConstants.S3_ACCESS_KEY)
+      .foreach(key => sparkConf.set(prefix + key, accessKey))
+    Seq(SECRET_KEY, CarbonCommonConstants.S3N_SECRET_KEY, 
CarbonCommonConstants.S3_SECRET_KEY)
+      .foreach(key => sparkConf.set(prefix + key, secretKey))
+    sparkConf.set(prefix + ENDPOINT, endpoint)
+  }
+
   def getKeyOnPrefix(path: String): (String, String, String) = {
     val prefix = "spark.hadoop."
     val endPoint = prefix + ENDPOINT
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala
index 44717ad..aefbfba 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExtensions.scala
@@ -17,14 +17,11 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, 
DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.hive.{CarbonIUDAnalysisRule, CarbonMVRules, 
CarbonPreInsertionCasts}
-import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.hive.{CarbonIUDAnalysisRule, 
CarbonPreInsertionCasts}
 import org.apache.spark.sql.parser.CarbonExtensionSqlParser
 
 class CarbonExtensions extends ((SparkSessionExtensions) => Unit) {
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 39ae15d..78b9a7e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -236,7 +236,6 @@ object CarbonSession {
           if (!sparkConf.contains("spark.app.name")) {
             sparkConf.setAppName(randomAppName)
           }
-          sparkConf.set("spark.sql.extensions", 
"org.apache.spark.sql.CarbonExtensions")
           val sc = SparkContext.getOrCreate(sparkConf)
           // maybe this is an existing SparkContext, update its SparkConf 
which maybe used
           // by SparkSession
@@ -247,30 +246,8 @@ object CarbonSession {
           sc
         }
 
-        // Initialize extensions if the user has defined a configurator class.
-        val extensionConfOption = 
sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
-        val extensionInstance : SparkSessionExtensions = new 
SparkSessionExtensions
-        if (extensionConfOption.isDefined) {
-          val extensionConfClassName = extensionConfOption.get
-          try {
-            val extensionConfClass = Utils.classForName(extensionConfClassName)
-            val ex = extensionConfClass.newInstance()
-              .asInstanceOf[(SparkSessionExtensions) => Unit]
-            ex(extensionInstance)
-
-          } catch {
-            // Ignore the error if we cannot find the class or when the class 
has the wrong type.
-            case e @ (_: ClassCastException |
-                      _: ClassNotFoundException |
-                      _: NoClassDefFoundError) =>
-            // Ignore extensions
-          }
-        }
-
         session = new CarbonSession(sparkContext, None, !enableInMemCatlog)
 
-        CarbonReflectionUtils.setSuperFieldToClass(session, "extensions", 
extensionInstance)
-
         val carbonProperties = CarbonProperties.getInstance()
         if (StringUtils.isNotBlank(storePath)) {
           carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, 
storePath)
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
new file mode 100644
index 0000000..35e9c0b
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
@@ -0,0 +1,102 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTablePartition}
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+import org.apache.carbondata.common.annotations.{InterfaceAudience, 
InterfaceStability}
+import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema 
=> ColumnSchema}
+
+/**
+ * This interface defines those common api used by carbon for spark-2.1 and 
spark-2.2 integration,
+ * but are not defined in SessionCatalog or HiveSessionCatalog to give 
contract to the
+ * Concrete implementation classes.
+ * For example CarbonSessionCatalog defined in 2.1 and 2.2.
+ *
+ */
+@InterfaceAudience.Internal
+@InterfaceStability.Stable
+trait CarbonSessionCatalog {
+  /**
+   * implementation to be provided by each CarbonSessionCatalog based on on 
used ExternalCatalog
+   *
+   * @return
+   */
+  def getClient(): org.apache.spark.sql.hive.client.HiveClient
+
+  /**
+   * The method returns the CarbonEnv instance
+   *
+   * @return
+   */
+  def getCarbonEnv(): CarbonEnv
+
+  /**
+   * This is alternate way of getting partition information. It first fetches 
all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   *
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  def getPartitionsAlternate(partitionFilters: Seq[Expression], sparkSession: 
SparkSession,
+      identifier: TableIdentifier): Seq[CatalogTablePartition]
+
+  /**
+   * Update the storageformat with new location information
+   */
+  def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat,
+      newTableName: String,
+      dbName: String): CatalogStorageFormat
+
+  /**
+   * Below method will be used to add new column
+   * @param tableIdentifier table identifier
+   * @param schemaParts schema parts
+   * @param cols cols
+   */
+  def alterAddColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[ColumnSchema]]): Unit
+
+  /**
+   * Below method will be used to drop column
+   * @param tableIdentifier table identifier
+   * @param schemaParts schema parts
+   * @param cols cols
+   */
+  def alterDropColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[ColumnSchema]]): Unit
+
+  /**
+   * Below method will be used to alter data type of column in schema
+   * @param tableIdentifier table identifier
+   * @param schemaParts schema parts
+   * @param cols cols
+   */
+  def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[ColumnSchema]]): Unit
+}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
index 5f9351b..dd2d751 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
@@ -17,11 +17,22 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
+import java.util.concurrent.Callable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, SparkSession}
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, 
DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.{CarbonSparkSqlParser, 
CarbonSparkSqlParserUtil}
 
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 
@@ -161,4 +172,192 @@ object CarbonSessionCatalogUtil {
     CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, 
identifier)
   }
 
+  /**
+   * Update the storageformat with new location information
+   */
+  def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat,
+      newTableName: String,
+      dbName: String): CatalogStorageFormat = {
+    storage.copy(locationUri = Some(path.toUri))
+  }
+}
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if 
the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonHiveSessionCatalog(
+    externalCatalog: HiveExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
+    functionRegistry: FunctionRegistry,
+    sparkSession: SparkSession,
+    conf: SQLConf,
+    hadoopConf: Configuration,
+    parser: ParserInterface,
+    functionResourceLoader: FunctionResourceLoader)
+  extends HiveSessionCatalog (
+    externalCatalog,
+    globalTempViewManager,
+    new HiveMetastoreCatalog(sparkSession),
+    functionRegistry,
+    conf,
+    hadoopConf,
+    parser,
+    functionResourceLoader
+  ) with CarbonSessionCatalog {
+
+  private lazy val carbonEnv = {
+    val env = new CarbonEnv
+    env.init(sparkSession)
+    env
+  }
+
+  /**
+   * return's the carbonEnv instance
+   * @return
+   */
+  override def getCarbonEnv() : CarbonEnv = {
+    carbonEnv
+  }
+
+  // Initialize all listeners to the Operation bus.
+  CarbonEnv.init
+
+  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
+    var rtnRelation = super.lookupRelation(name)
+    val isRelationRefreshed =
+      CarbonSessionUtil.refreshRelationAndSetStats(rtnRelation, 
name)(sparkSession)
+    if (isRelationRefreshed) {
+      rtnRelation = super.lookupRelation(name)
+      // Reset the stats after lookup.
+      CarbonSessionUtil.refreshRelationAndSetStats(rtnRelation, 
name)(sparkSession)
+    }
+    rtnRelation
+  }
+
+  override def getCachedPlan(t: QualifiedTableName,
+      c: Callable[LogicalPlan]): LogicalPlan = {
+    val plan = super.getCachedPlan(t, c)
+    CarbonSessionUtil.updateCachedPlan(plan)
+  }
+
+  /**
+   * returns hive client from HiveExternalCatalog
+   *
+   * @return
+   */
+  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+    CarbonSessionCatalogUtil.getClient(sparkSession)
+  }
+
+  override def alterAddColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[ColumnSchema]]): Unit = {
+    CarbonSessionCatalogUtil.alterAddColumns(tableIdentifier, schemaParts, 
cols, sparkSession)
+  }
+
+  override def alterDropColumns(tableIdentifier: TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[ColumnSchema]]): Unit = {
+    CarbonSessionCatalogUtil.alterDropColumns(tableIdentifier, schemaParts, 
cols, sparkSession)
+  }
+
+  override def alterColumnChangeDataTypeOrRename(tableIdentifier: 
TableIdentifier,
+      schemaParts: String,
+      cols: Option[Seq[ColumnSchema]]): Unit = {
+    CarbonSessionCatalogUtil.alterColumnChangeDataTypeOrRename(
+      tableIdentifier, schemaParts, cols, sparkSession)
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches 
all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession, identifier: TableIdentifier): 
Seq[CatalogTablePartition] = {
+    CarbonSessionCatalogUtil.getPartitionsAlternate(partitionFilters, 
sparkSession, identifier)
+  }
+
+  /**
+   * Update the storageformat with new location information
+   */
+  override def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat,
+      newTableName: String,
+      dbName: String): CatalogStorageFormat = {
+    CarbonSessionCatalogUtil.updateStorageLocation(path, storage, 
newTableName, dbName)
+  }
+}
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ *
+ * @param sparkSession
+ */
+class CarbonSessionStateBuilder(sparkSession: SparkSession,
+    parentState: Option[SessionState] = None)
+  extends HiveSessionStateBuilder(sparkSession, parentState) {
+
+  override lazy val sqlParser: ParserInterface = new 
CarbonSparkSqlParser(conf, sparkSession)
+
+  experimentalMethods.extraStrategies =
+    Seq(new StreamingTableStrategy(sparkSession),
+      new CarbonLateDecodeStrategy,
+      new DDLStrategy(sparkSession)
+    )
+  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
+    new CarbonUDFTransformRule)
+
+  /**
+   * Internal catalog for managing table and database states.
+   */
+  /**
+   * Create a [[CarbonSessionStateBuilder]].
+   */
+  override protected lazy val catalog: CarbonHiveSessionCatalog = {
+    val catalog = new CarbonHiveSessionCatalog(
+      externalCatalog,
+      session.sharedState.globalTempViewManager,
+      functionRegistry,
+      sparkSession,
+      conf,
+      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, 
conf),
+      sqlParser,
+      resourceLoader)
+    parentState.foreach(_.catalog.copyStateTo(catalog))
+    catalog
+  }
+
+  private def externalCatalog: HiveExternalCatalog =
+    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+
+  /**
+   * Create a Hive aware resource loader.
+   */
+  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
+    val client: HiveClient = externalCatalog.client.newSession()
+    new HiveSessionResourceLoader(session, client)
+  }
+
+  override protected def analyzer: Analyzer = {
+    new CarbonAnalyzer(catalog,
+      conf,
+      sparkSession,
+      super.analyzer)
+  }
 }

Reply via email to