Repository: carbondata
Updated Branches:
  refs/heads/master 0e6fe6cae -> 0c200d834


[CARBONDATA-2285] Spark integration code refactor

This closes #2104


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0c200d83
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0c200d83
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0c200d83

Branch: refs/heads/master
Commit: 0c200d83476c97742b999e456617950c0bd49acf
Parents: 0e6fe6c
Author: mohammadshahidkhan <[email protected]>
Authored: Mon Mar 26 16:36:29 2018 +0530
Committer: Jacky Li <[email protected]>
Committed: Thu Mar 29 22:25:10 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  6 ++
 .../hadoop/api/CarbonTableInputFormat.java      | 19 +++-
 .../spark/util/CarbonReflectionUtils.scala      |  6 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  8 +-
 .../spark/sql/hive/CarbonSessionCatalog.scala   | 67 ++++++++++++++
 .../org/apache/spark/util/AlterTableUtil.scala  |  2 +-
 .../spark/sql/hive/CarbonSessionState.scala     | 18 ++--
 .../spark/sql/hive/CarbonSessionState.scala     | 66 ++++++-------
 .../spark/sql/hive/CarbonSessionUtil.scala      | 97 ++++++++++++++++++++
 .../spark/sql/common/util/Spark2QueryTest.scala |  4 +-
 10 files changed, 235 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ff7a4bc..001ee72 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1297,6 +1297,12 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_SESSIONSTATE_CLASSNAME = 
"spark.carbon.sessionstate.classname";
 
+  /**
+   * This property will be used to configure the sqlastbuilder class.
+   */
+  public static final String CARBON_SQLASTBUILDER_CLASSNAME =
+      "spark.carbon.sqlastbuilder.classname";
+
   public static final String CARBON_COMMON_LISTENER_REGISTER_CLASSNAME =
       "spark.carbon.common.listener.register.classname";
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 14f4c71..c94f777 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -147,12 +147,13 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
     if (getValidateSegmentsToAccess(job.getConfiguration())) {
       List<Segment> validSegments = segments.getValidSegments();
       streamSegments = segments.getStreamSegments();
-      streamSegments = getFilteredSegment(job,streamSegments);
+      streamSegments = getFilteredSegment(job,streamSegments, true);
       if (validSegments.size() == 0) {
         return getSplitsOfStreaming(job, identifier, streamSegments);
       }
 
-      List<Segment> filteredSegmentToAccess = getFilteredSegment(job, 
segments.getValidSegments());
+      List<Segment> filteredSegmentToAccess = getFilteredSegment(job, 
segments.getValidSegments(),
+          true);
       if (filteredSegmentToAccess.size() == 0) {
         return getSplitsOfStreaming(job, identifier, streamSegments);
       } else {
@@ -175,7 +176,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
     validAndInProgressSegments.addAll(segments.getListOfInProgressSegments());
     // get updated filtered list
     List<Segment> filteredSegmentToAccess =
-        getFilteredSegment(job, new ArrayList<>(validAndInProgressSegments));
+        getFilteredSegment(job, new ArrayList<>(validAndInProgressSegments), 
false);
     // Clean the updated segments from memory if the update happens on segments
     List<Segment> toBeCleanedSegments = new ArrayList<>();
     for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager
@@ -247,7 +248,8 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
    * Return segment list after filtering out valid segments and segments set 
by user by
    * `INPUT_SEGMENT_NUMBERS` in job configuration
    */
-  private List<Segment> getFilteredSegment(JobContext job, List<Segment> 
validSegments) {
+  private List<Segment> getFilteredSegment(JobContext job, List<Segment> 
validSegments,
+      boolean validationRequired) {
     Segment[] segmentsToAccess = getSegmentsToAccess(job);
     List<Segment> segmentToAccessSet =
         new ArrayList<>(new HashSet<>(Arrays.asList(segmentsToAccess)));
@@ -267,6 +269,13 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
           }
         }
       }
+      if (filteredSegmentToAccess.size() != segmentToAccessSet.size() && 
!validationRequired) {
+        for (Segment segment : segmentToAccessSet) {
+          if (!filteredSegmentToAccess.contains(segment)) {
+            filteredSegmentToAccess.add(segment);
+          }
+        }
+      }
       if (!filteredSegmentToAccess.containsAll(segmentToAccessSet)) {
         List<Segment> filteredSegmentToAccessTemp = new 
ArrayList<>(filteredSegmentToAccess);
         filteredSegmentToAccessTemp.removeAll(segmentToAccessSet);
@@ -536,7 +545,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
     Map<String, Long> segmentAndBlockCountMapping = new HashMap<>();
 
     // TODO: currently only batch segment is supported, add support for 
streaming table
-    List<Segment> filteredSegment = getFilteredSegment(job, 
allSegments.getValidSegments());
+    List<Segment> filteredSegment = getFilteredSegment(job, 
allSegments.getValidSegments(), false);
 
     List<ExtendedBlocklet> blocklets = blockletMap.prune(filteredSegment, 
null, partitions);
     for (ExtendedBlocklet blocklet : blocklets) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 2f15263..69eb021 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -183,8 +183,10 @@ object CarbonReflectionUtils {
       sqlParser: Object,
       sparkSession: SparkSession): AstBuilder = {
     if (SPARK_VERSION.startsWith("2.1") || SPARK_VERSION.startsWith("2.2")) {
-      createObject(
-        "org.apache.spark.sql.hive.CarbonSqlAstBuilder",
+      val className = sparkSession.sparkContext.conf.get(
+        CarbonCommonConstants.CARBON_SQLASTBUILDER_CLASSNAME,
+        "org.apache.spark.sql.hive.CarbonSqlAstBuilder")
+      createObject(className,
         conf,
         sqlParser, sparkSession)._1.asInstanceOf[AstBuilder]
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 4a76bca..63e0834 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -125,7 +125,7 @@ object CarbonEnv {
 
   def getInstance(sparkSession: SparkSession): CarbonEnv = {
     if (sparkSession.isInstanceOf[CarbonSession]) {
-      
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].carbonEnv
+      
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getCarbonEnv()
     } else {
       var carbonEnv: CarbonEnv = carbonEnvMap.get(sparkSession)
       if (carbonEnv == null) {
@@ -205,7 +205,11 @@ object CarbonEnv {
   def refreshRelationFromCache(identifier: TableIdentifier)(sparkSession: 
SparkSession): Boolean = {
     var isRefreshed = false
     val carbonEnv = getInstance(sparkSession)
-    if 
(carbonEnv.carbonMetastore.checkSchemasModifiedTimeAndReloadTable(identifier)) {
+    val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
+      identifier.database.getOrElse("default"), identifier.table)
+    if (table.isEmpty ||
+        (table.isDefined && carbonEnv.carbonMetastore
+          .checkSchemasModifiedTimeAndReloadTable(identifier))) {
       sparkSession.sessionState.catalog.refreshTable(identifier)
       DataMapStoreManager.getInstance().
         
clearDataMaps(AbsoluteTableIdentifier.from(CarbonProperties.getStorePath,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
new file mode 100644
index 0000000..1c69309
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
@@ -0,0 +1,67 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTablePartition}
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+import org.apache.carbondata.common.annotations.{InterfaceAudience, 
InterfaceStability}
+
+/**
+ * This interface defines those common api used by carbon for spark-2.1 and 
spark-2.2 integration,
+ * but are not defined in SessionCatalog or HiveSessionCatalog to give 
contract to the
+ * Concrete implementation classes.
+ * For example CarbonSessionCatalog defined in 2.1 and 2.2.
+ *
+ */
[email protected]
[email protected]
+trait CarbonSessionCatalog {
+  /**
+   * implementation to be provided by each CarbonSessionCatalog based on on 
used ExternalCatalog
+   *
+   * @return
+   */
+  def getClient(): org.apache.spark.sql.hive.client.HiveClient
+
+  /**
+   * The method returns the CarbonEnv instance
+   *
+   * @return
+   */
+  def getCarbonEnv(): CarbonEnv
+
+  /**
+   * This is alternate way of getting partition information. It first fetches 
all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   *
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  def getPartitionsAlternate(partitionFilters: Seq[Expression], sparkSession: 
SparkSession,
+      identifier: TableIdentifier): Seq[CatalogTablePartition]
+
+  /**
+   * Update the storageformat with new location information
+   */
+  def updateStorageLocation(path: Path, storage: CatalogStorageFormat): 
CatalogStorageFormat
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 45e956a..1c6756b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, 
HiveExternalCatalog}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
 
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 4c2304d..c37e6fa 100644
--- 
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ 
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -55,7 +55,7 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
  * @param conf
  * @param hadoopConf
  */
-class CarbonSessionCatalog(
+class CarbonHiveSessionCatalog(
     externalCatalog: HiveExternalCatalog,
     globalTempViewManager: GlobalTempViewManager,
     sparkSession: SparkSession,
@@ -70,14 +70,20 @@ class CarbonSessionCatalog(
     functionResourceLoader,
     functionRegistry,
     conf,
-    hadoopConf) {
+    hadoopConf) with CarbonSessionCatalog {
 
   lazy val carbonEnv = {
     val env = new CarbonEnv
     env.init(sparkSession)
     env
   }
-
+  /**
+   * return's the carbonEnv instance
+   * @return
+   */
+  override def getCarbonEnv() : CarbonEnv = {
+    carbonEnv
+  }
   // Initialize all listeners to the Operation bus.
   CarbonEnv.init(sparkSession)
 
@@ -138,7 +144,7 @@ class CarbonSessionCatalog(
    *
    * @return
    */
-  def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
     sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
   }
 
@@ -187,7 +193,7 @@ class CarbonSessionCatalog(
   /**
    * Update the storageformat with new location information
    */
-  def updateStorageLocation(
+  override def updateStorageLocation(
       path: Path,
       storage: CatalogStorageFormat): CatalogStorageFormat = {
     storage.copy(locationUri = Some(path.toString))
@@ -254,7 +260,7 @@ class CarbonSessionState(sparkSession: SparkSession) 
extends HiveSessionState(sp
    * Internal catalog for managing table and database states.
    */
   override lazy val catalog = {
-    new CarbonSessionCatalog(
+    new CarbonHiveSessionCatalog(
       
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
       sparkSession.sharedState.globalTempViewManager,
       sparkSession,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
 
b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index 2ffc7db..479c9ce 100644
--- 
a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ 
b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -59,7 +59,7 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
  * @param conf
  * @param hadoopConf
  */
-class CarbonSessionCatalog(
+class CarbonHiveSessionCatalog(
     externalCatalog: HiveExternalCatalog,
     globalTempViewManager: GlobalTempViewManager,
     functionRegistry: FunctionRegistry,
@@ -68,7 +68,7 @@ class CarbonSessionCatalog(
     hadoopConf: Configuration,
     parser: ParserInterface,
     functionResourceLoader: FunctionResourceLoader)
-  extends HiveSessionCatalog(
+  extends HiveSessionCatalog (
     externalCatalog,
     globalTempViewManager,
     new HiveMetastoreCatalog(sparkSession),
@@ -77,15 +77,18 @@ class CarbonSessionCatalog(
     hadoopConf,
     parser,
     functionResourceLoader
-  ) {
+  ) with CarbonSessionCatalog {
 
   lazy val carbonEnv = {
     val env = new CarbonEnv
     env.init(sparkSession)
     env
   }
-
-  def getCarbonEnv() : CarbonEnv = {
+  /**
+   * return's the carbonEnv instance
+   * @return
+   */
+  override def getCarbonEnv() : CarbonEnv = {
     carbonEnv
   }
 
@@ -97,28 +100,9 @@ class CarbonSessionCatalog(
 
   override def lookupRelation(name: TableIdentifier): LogicalPlan = {
     val rtnRelation = super.lookupRelation(name)
-    var toRefreshRelation = false
-    rtnRelation match {
-      case SubqueryAlias(_,
-      LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) =>
-        toRefreshRelation = 
CarbonEnv.refreshRelationFromCache(name)(sparkSession)
-      case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
-        toRefreshRelation = 
CarbonEnv.refreshRelationFromCache(name)(sparkSession)
-      case SubqueryAlias(_, relation) if
-      
relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation")
 ||
-      
relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation")
 ||
-      relation.getClass.getName.equals(
-        "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") =>
-        val catalogTable =
-          CarbonReflectionUtils.getFieldOfCatalogTable(
-            "tableMeta",
-            relation).asInstanceOf[CatalogTable]
-        toRefreshRelation =
-          
CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
-      case _ =>
-    }
-
-    if (toRefreshRelation) {
+    val isRelationRefreshed =
+      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
+    if (isRelationRefreshed) {
       super.lookupRelation(name)
     } else {
       rtnRelation
@@ -130,7 +114,7 @@ class CarbonSessionCatalog(
    *
    * @return
    */
-  def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
     sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
       .asInstanceOf[HiveExternalCatalog].client
   }
@@ -157,21 +141,16 @@ class CarbonSessionCatalog(
    * @param identifier
    * @return
    */
-  def getPartitionsAlternate(partitionFilters: Seq[Expression],
+  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
       sparkSession: SparkSession,
       identifier: TableIdentifier) = {
-    val allPartitions = 
sparkSession.sessionState.catalog.listPartitions(identifier)
-    ExternalCatalogUtils.prunePartitionsByFilter(
-      sparkSession.sessionState.catalog.getTableMetadata(identifier),
-      allPartitions,
-      partitionFilters,
-      sparkSession.sessionState.conf.sessionLocalTimeZone)
+    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, 
identifier)
   }
 
   /**
    * Update the storageformat with new location information
    */
-  def updateStorageLocation(
+  override def updateStorageLocation(
       path: Path,
       storage: CatalogStorageFormat): CatalogStorageFormat = {
     storage.copy(locationUri = Some(path.toUri))
@@ -217,8 +196,8 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession,
   /**
    * Create a [[CarbonSessionCatalogBuild]].
    */
-  override protected lazy val catalog: CarbonSessionCatalog = {
-    val catalog = new CarbonSessionCatalog(
+  override protected lazy val catalog: CarbonHiveSessionCatalog = {
+    val catalog = new CarbonHiveSessionCatalog(
       externalCatalog,
       session.sharedState.globalTempViewManager,
       functionRegistry,
@@ -280,6 +259,13 @@ class CarbonOptimizer(
   extends SparkOptimizer(catalog, conf, experimentalMethods) {
 
   override def execute(plan: LogicalPlan): LogicalPlan = {
+    val transFormedPlan: LogicalPlan = 
CarbonOptimizerUtil.transformForScalarSubQuery(plan)
+    super.execute(transFormedPlan)
+  }
+}
+
+object CarbonOptimizerUtil {
+  def transformForScalarSubQuery(plan: LogicalPlan): LogicalPlan = {
     // In case scalar subquery add flag in relation to skip the decoder plan 
in optimizer rule, And
     // optimize whole plan at once.
     val transFormedPlan = plan.transform {
@@ -309,10 +295,10 @@ class CarbonOptimizer(
                 
lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
                 lr
             }
-            In(value, Seq(ListQuery(tPlan, l.children , exprId)))
+            In(value, Seq(ListQuery(tPlan, l.children, exprId)))
         }
     }
-    super.execute(transFormedPlan)
+    transFormedPlan
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
 
b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
new file mode 100644
index 0000000..ac702ea
--- /dev/null
+++ 
b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -0,0 +1,97 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTablePartition}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, 
SparkSession}
+import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+/**
+ * This class refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table.
+ */
+object CarbonSessionUtil {
+
+  val LOGGER = LogServiceFactory.getLogService("CarbonSessionUtil")
+
+  /**
+   * The method refreshes the cache entry
+   *
+   * @param rtnRelation [[LogicalPlan]] represents the given table or view.
+   * @param name        tableName
+   * @param sparkSession
+   * @return
+   */
+  def refreshRelation(rtnRelation: LogicalPlan, name: TableIdentifier)
+    (sparkSession: SparkSession): Boolean = {
+    var isRelationRefreshed = false
+    rtnRelation match {
+      case SubqueryAlias(_,
+      LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)
+      ) =>
+        isRelationRefreshed = 
CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+      case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
+        isRelationRefreshed = 
CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+      case SubqueryAlias(_, relation) if
+      
relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation")
 ||
+      relation.getClass.getName
+        .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
+      relation.getClass.getName.equals(
+        "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation"
+      ) =>
+        val catalogTable =
+          CarbonReflectionUtils.getFieldOfCatalogTable(
+            "tableMeta",
+            relation
+          ).asInstanceOf[CatalogTable]
+        isRelationRefreshed =
+          
CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
+      case _ =>
+    }
+    isRelationRefreshed
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches 
all partitions from
+   * hive and then apply filter instead of querying hive along with filters.
+   *
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  def prunePartitionsByFilter(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
+    val allPartitions = 
sparkSession.sessionState.catalog.listPartitions(identifier)
+    ExternalCatalogUtils.prunePartitionsByFilter(
+      sparkSession.sessionState.catalog.getTableMetadata(identifier),
+      allPartitions,
+      partitionFilters,
+      sparkSession.sessionState.conf.sessionLocalTimeZone
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
index 169bcf2..ff64c05 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
@@ -18,13 +18,13 @@
 package org.apache.spark.sql.common.util
 
 import org.apache.spark.sql.CarbonSession
-import org.apache.spark.sql.hive.{CarbonSessionCatalog, HiveExternalCatalog}
+import org.apache.spark.sql.hive.{CarbonHiveSessionCatalog, 
HiveExternalCatalog}
 import org.apache.spark.sql.test.util.QueryTest
 
 
 class Spark2QueryTest extends QueryTest {
 
-  val hiveClient = 
sqlContext.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
+  val hiveClient = 
sqlContext.sparkSession.sessionState.catalog.asInstanceOf[CarbonHiveSessionCatalog]
     .getClient()
 
 }
\ No newline at end of file

Reply via email to