Repository: carbondata Updated Branches: refs/heads/master 0e6fe6cae -> 0c200d834
[CARBONDATA-2285] Spark integration code refactor This closes #2104 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0c200d83 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0c200d83 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0c200d83 Branch: refs/heads/master Commit: 0c200d83476c97742b999e456617950c0bd49acf Parents: 0e6fe6c Author: mohammadshahidkhan <[email protected]> Authored: Mon Mar 26 16:36:29 2018 +0530 Committer: Jacky Li <[email protected]> Committed: Thu Mar 29 22:25:10 2018 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 6 ++ .../hadoop/api/CarbonTableInputFormat.java | 19 +++- .../spark/util/CarbonReflectionUtils.scala | 6 +- .../scala/org/apache/spark/sql/CarbonEnv.scala | 8 +- .../spark/sql/hive/CarbonSessionCatalog.scala | 67 ++++++++++++++ .../org/apache/spark/util/AlterTableUtil.scala | 2 +- .../spark/sql/hive/CarbonSessionState.scala | 18 ++-- .../spark/sql/hive/CarbonSessionState.scala | 66 ++++++------- .../spark/sql/hive/CarbonSessionUtil.scala | 97 ++++++++++++++++++++ .../spark/sql/common/util/Spark2QueryTest.scala | 4 +- 10 files changed, 235 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index ff7a4bc..001ee72 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1297,6 +1297,12 @@ public final class CarbonCommonConstants { public static final String CARBON_SESSIONSTATE_CLASSNAME = "spark.carbon.sessionstate.classname"; + /** + * This property will be used to configure the sqlastbuilder class. + */ + public static final String CARBON_SQLASTBUILDER_CLASSNAME = + "spark.carbon.sqlastbuilder.classname"; + public static final String CARBON_COMMON_LISTENER_REGISTER_CLASSNAME = "spark.carbon.common.listener.register.classname"; http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 14f4c71..c94f777 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -147,12 +147,13 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { if (getValidateSegmentsToAccess(job.getConfiguration())) { List<Segment> validSegments = segments.getValidSegments(); streamSegments = segments.getStreamSegments(); - streamSegments = getFilteredSegment(job,streamSegments); + streamSegments = getFilteredSegment(job,streamSegments, true); if (validSegments.size() == 0) { return getSplitsOfStreaming(job, identifier, streamSegments); } - List<Segment> filteredSegmentToAccess = getFilteredSegment(job, segments.getValidSegments()); + List<Segment> filteredSegmentToAccess = getFilteredSegment(job, segments.getValidSegments(), + true); if (filteredSegmentToAccess.size() == 0) { return getSplitsOfStreaming(job, identifier, streamSegments); } else { @@ -175,7 +176,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { validAndInProgressSegments.addAll(segments.getListOfInProgressSegments()); // get updated filtered list List<Segment> filteredSegmentToAccess = - getFilteredSegment(job, new ArrayList<>(validAndInProgressSegments)); + getFilteredSegment(job, new ArrayList<>(validAndInProgressSegments), false); // Clean the updated segments from memory if the update happens on segments List<Segment> toBeCleanedSegments = new ArrayList<>(); for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager @@ -247,7 +248,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { * Return segment list after filtering out valid segments and segments set by user by * `INPUT_SEGMENT_NUMBERS` in job configuration */ - private List<Segment> getFilteredSegment(JobContext job, List<Segment> validSegments) { + private List<Segment> getFilteredSegment(JobContext job, List<Segment> validSegments, + boolean validationRequired) { Segment[] segmentsToAccess = getSegmentsToAccess(job); List<Segment> segmentToAccessSet = new ArrayList<>(new HashSet<>(Arrays.asList(segmentsToAccess))); @@ -267,6 +269,13 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { } } } + if (filteredSegmentToAccess.size() != segmentToAccessSet.size() && !validationRequired) { + for (Segment segment : segmentToAccessSet) { + if (!filteredSegmentToAccess.contains(segment)) { + filteredSegmentToAccess.add(segment); + } + } + } if (!filteredSegmentToAccess.containsAll(segmentToAccessSet)) { List<Segment> filteredSegmentToAccessTemp = new ArrayList<>(filteredSegmentToAccess); filteredSegmentToAccessTemp.removeAll(segmentToAccessSet); @@ -536,7 +545,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { Map<String, Long> segmentAndBlockCountMapping = new HashMap<>(); // TODO: currently only batch segment is supported, add support for streaming table - List<Segment> filteredSegment = getFilteredSegment(job, allSegments.getValidSegments()); + List<Segment> filteredSegment = getFilteredSegment(job, allSegments.getValidSegments(), false); List<ExtendedBlocklet> blocklets = blockletMap.prune(filteredSegment, null, partitions); for (ExtendedBlocklet blocklet : blocklets) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala index 2f15263..69eb021 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala @@ -183,8 +183,10 @@ object CarbonReflectionUtils { sqlParser: Object, sparkSession: SparkSession): AstBuilder = { if (SPARK_VERSION.startsWith("2.1") || SPARK_VERSION.startsWith("2.2")) { - createObject( - "org.apache.spark.sql.hive.CarbonSqlAstBuilder", + val className = sparkSession.sparkContext.conf.get( + CarbonCommonConstants.CARBON_SQLASTBUILDER_CLASSNAME, + "org.apache.spark.sql.hive.CarbonSqlAstBuilder") + createObject(className, conf, sqlParser, sparkSession)._1.asInstanceOf[AstBuilder] } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 4a76bca..63e0834 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -125,7 +125,7 @@ object CarbonEnv { def getInstance(sparkSession: SparkSession): CarbonEnv = { if (sparkSession.isInstanceOf[CarbonSession]) { - sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].carbonEnv + sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getCarbonEnv() } else { var carbonEnv: CarbonEnv = carbonEnvMap.get(sparkSession) if (carbonEnv == null) { @@ -205,7 +205,11 @@ object CarbonEnv { def refreshRelationFromCache(identifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { var isRefreshed = false val carbonEnv = getInstance(sparkSession) - if (carbonEnv.carbonMetastore.checkSchemasModifiedTimeAndReloadTable(identifier)) { + val table = carbonEnv.carbonMetastore.getTableFromMetadataCache( + identifier.database.getOrElse("default"), identifier.table) + if (table.isEmpty || + (table.isDefined && carbonEnv.carbonMetastore + .checkSchemasModifiedTimeAndReloadTable(identifier))) { sparkSession.sessionState.catalog.refreshTable(identifier) DataMapStoreManager.getInstance(). clearDataMaps(AbsoluteTableIdentifier.from(CarbonProperties.getStorePath, http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala new file mode 100644 index 0000000..1c69309 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala @@ -0,0 +1,67 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.spark.sql.hive + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition} +import org.apache.spark.sql.catalyst.expressions.Expression + +import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability} + +/** + * This interface defines those common api used by carbon for spark-2.1 and spark-2.2 integration, + * but are not defined in SessionCatalog or HiveSessionCatalog to give contract to the + * Concrete implementation classes. + * For example CarbonSessionCatalog defined in 2.1 and 2.2. + * + */ [email protected] [email protected] +trait CarbonSessionCatalog { + /** + * implementation to be provided by each CarbonSessionCatalog based on on used ExternalCatalog + * + * @return + */ + def getClient(): org.apache.spark.sql.hive.client.HiveClient + + /** + * The method returns the CarbonEnv instance + * + * @return + */ + def getCarbonEnv(): CarbonEnv + + /** + * This is alternate way of getting partition information. It first fetches all partitions from + * hive and then apply filter instead of querying hive along with filters. + * + * @param partitionFilters + * @param sparkSession + * @param identifier + * @return + */ + def getPartitionsAlternate(partitionFilters: Seq[Expression], sparkSession: SparkSession, + identifier: TableIdentifier): Seq[CatalogTablePartition] + + /** + * Update the storageformat with new location information + */ + def updateStorageLocation(path: Path, storage: CatalogStorageFormat): CatalogStorageFormat +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 45e956a..1c6756b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog} +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog} import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala index 4c2304d..c37e6fa 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -55,7 +55,7 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil * @param conf * @param hadoopConf */ -class CarbonSessionCatalog( +class CarbonHiveSessionCatalog( externalCatalog: HiveExternalCatalog, globalTempViewManager: GlobalTempViewManager, sparkSession: SparkSession, @@ -70,14 +70,20 @@ class CarbonSessionCatalog( functionResourceLoader, functionRegistry, conf, - hadoopConf) { + hadoopConf) with CarbonSessionCatalog { lazy val carbonEnv = { val env = new CarbonEnv env.init(sparkSession) env } - + /** + * return's the carbonEnv instance + * @return + */ + override def getCarbonEnv() : CarbonEnv = { + carbonEnv + } // Initialize all listeners to the Operation bus. CarbonEnv.init(sparkSession) @@ -138,7 +144,7 @@ class CarbonSessionCatalog( * * @return */ - def getClient(): org.apache.spark.sql.hive.client.HiveClient = { + override def getClient(): org.apache.spark.sql.hive.client.HiveClient = { sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive } @@ -187,7 +193,7 @@ class CarbonSessionCatalog( /** * Update the storageformat with new location information */ - def updateStorageLocation( + override def updateStorageLocation( path: Path, storage: CatalogStorageFormat): CatalogStorageFormat = { storage.copy(locationUri = Some(path.toString)) @@ -254,7 +260,7 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp * Internal catalog for managing table and database states. */ override lazy val catalog = { - new CarbonSessionCatalog( + new CarbonHiveSessionCatalog( sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog], sparkSession.sharedState.globalTempViewManager, sparkSession, http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala index 2ffc7db..479c9ce 100644 --- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -59,7 +59,7 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil * @param conf * @param hadoopConf */ -class CarbonSessionCatalog( +class CarbonHiveSessionCatalog( externalCatalog: HiveExternalCatalog, globalTempViewManager: GlobalTempViewManager, functionRegistry: FunctionRegistry, @@ -68,7 +68,7 @@ class CarbonSessionCatalog( hadoopConf: Configuration, parser: ParserInterface, functionResourceLoader: FunctionResourceLoader) - extends HiveSessionCatalog( + extends HiveSessionCatalog ( externalCatalog, globalTempViewManager, new HiveMetastoreCatalog(sparkSession), @@ -77,15 +77,18 @@ class CarbonSessionCatalog( hadoopConf, parser, functionResourceLoader - ) { + ) with CarbonSessionCatalog { lazy val carbonEnv = { val env = new CarbonEnv env.init(sparkSession) env } - - def getCarbonEnv() : CarbonEnv = { + /** + * return's the carbonEnv instance + * @return + */ + override def getCarbonEnv() : CarbonEnv = { carbonEnv } @@ -97,28 +100,9 @@ class CarbonSessionCatalog( override def lookupRelation(name: TableIdentifier): LogicalPlan = { val rtnRelation = super.lookupRelation(name) - var toRefreshRelation = false - rtnRelation match { - case SubqueryAlias(_, - LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) => - toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession) - case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) => - toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession) - case SubqueryAlias(_, relation) if - relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") || - relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") || - relation.getClass.getName.equals( - "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") => - val catalogTable = - CarbonReflectionUtils.getFieldOfCatalogTable( - "tableMeta", - relation).asInstanceOf[CatalogTable] - toRefreshRelation = - CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession) - case _ => - } - - if (toRefreshRelation) { + val isRelationRefreshed = + CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession) + if (isRelationRefreshed) { super.lookupRelation(name) } else { rtnRelation @@ -130,7 +114,7 @@ class CarbonSessionCatalog( * * @return */ - def getClient(): org.apache.spark.sql.hive.client.HiveClient = { + override def getClient(): org.apache.spark.sql.hive.client.HiveClient = { sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog .asInstanceOf[HiveExternalCatalog].client } @@ -157,21 +141,16 @@ class CarbonSessionCatalog( * @param identifier * @return */ - def getPartitionsAlternate(partitionFilters: Seq[Expression], + override def getPartitionsAlternate(partitionFilters: Seq[Expression], sparkSession: SparkSession, identifier: TableIdentifier) = { - val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier) - ExternalCatalogUtils.prunePartitionsByFilter( - sparkSession.sessionState.catalog.getTableMetadata(identifier), - allPartitions, - partitionFilters, - sparkSession.sessionState.conf.sessionLocalTimeZone) + CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier) } /** * Update the storageformat with new location information */ - def updateStorageLocation( + override def updateStorageLocation( path: Path, storage: CatalogStorageFormat): CatalogStorageFormat = { storage.copy(locationUri = Some(path.toUri)) @@ -217,8 +196,8 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession, /** * Create a [[CarbonSessionCatalogBuild]]. */ - override protected lazy val catalog: CarbonSessionCatalog = { - val catalog = new CarbonSessionCatalog( + override protected lazy val catalog: CarbonHiveSessionCatalog = { + val catalog = new CarbonHiveSessionCatalog( externalCatalog, session.sharedState.globalTempViewManager, functionRegistry, @@ -280,6 +259,13 @@ class CarbonOptimizer( extends SparkOptimizer(catalog, conf, experimentalMethods) { override def execute(plan: LogicalPlan): LogicalPlan = { + val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan) + super.execute(transFormedPlan) + } +} + +object CarbonOptimizerUtil { + def transformForScalarSubQuery(plan: LogicalPlan): LogicalPlan = { // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And // optimize whole plan at once. val transFormedPlan = plan.transform { @@ -309,10 +295,10 @@ class CarbonOptimizer( lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true lr } - In(value, Seq(ListQuery(tPlan, l.children , exprId))) + In(value, Seq(ListQuery(tPlan, l.children, exprId))) } } - super.execute(transFormedPlan) + transFormedPlan } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala new file mode 100644 index 0000000..ac702ea --- /dev/null +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala @@ -0,0 +1,97 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession} +import org.apache.spark.util.CarbonReflectionUtils +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.catalyst.expressions.Expression + +import org.apache.carbondata.common.logging.LogServiceFactory + +/** + * This class refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table. + */ +object CarbonSessionUtil { + + val LOGGER = LogServiceFactory.getLogService("CarbonSessionUtil") + + /** + * The method refreshes the cache entry + * + * @param rtnRelation [[LogicalPlan]] represents the given table or view. + * @param name tableName + * @param sparkSession + * @return + */ + def refreshRelation(rtnRelation: LogicalPlan, name: TableIdentifier) + (sparkSession: SparkSession): Boolean = { + var isRelationRefreshed = false + rtnRelation match { + case SubqueryAlias(_, + LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) + ) => + isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession) + case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) => + isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession) + case SubqueryAlias(_, relation) if + relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") || + relation.getClass.getName + .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") || + relation.getClass.getName.equals( + "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation" + ) => + val catalogTable = + CarbonReflectionUtils.getFieldOfCatalogTable( + "tableMeta", + relation + ).asInstanceOf[CatalogTable] + isRelationRefreshed = + CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession) + case _ => + } + isRelationRefreshed + } + + /** + * This is alternate way of getting partition information. It first fetches all partitions from + * hive and then apply filter instead of querying hive along with filters. + * + * @param partitionFilters + * @param sparkSession + * @param identifier + * @return + */ + def prunePartitionsByFilter(partitionFilters: Seq[Expression], + sparkSession: SparkSession, + identifier: TableIdentifier): Seq[CatalogTablePartition] = { + val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier) + ExternalCatalogUtils.prunePartitionsByFilter( + sparkSession.sessionState.catalog.getTableMetadata(identifier), + allPartitions, + partitionFilters, + sparkSession.sessionState.conf.sessionLocalTimeZone + ) + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c200d83/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala index 169bcf2..ff64c05 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala @@ -18,13 +18,13 @@ package org.apache.spark.sql.common.util import org.apache.spark.sql.CarbonSession -import org.apache.spark.sql.hive.{CarbonSessionCatalog, HiveExternalCatalog} +import org.apache.spark.sql.hive.{CarbonHiveSessionCatalog, HiveExternalCatalog} import org.apache.spark.sql.test.util.QueryTest class Spark2QueryTest extends QueryTest { - val hiveClient = sqlContext.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] + val hiveClient = sqlContext.sparkSession.sessionState.catalog.asInstanceOf[CarbonHiveSessionCatalog] .getClient() } \ No newline at end of file
