Repository: carbondata Updated Branches: refs/heads/master 49763b72b -> d19f01855
[CARBONDATA-1864] Using org.apache.spark.SPARK_VERSION instead of sparkSession.version Using org.apache.spark.SPARK_VERSION instead of sparkSession.version This closes #1620 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d19f0185 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d19f0185 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d19f0185 Branch: refs/heads/master Commit: d19f018555208300f67369610acd53851da4d00a Parents: 49763b7 Author: QiangCai <[email protected]> Authored: Wed Dec 6 10:23:15 2017 +0800 Committer: Jacky Li <[email protected]> Committed: Thu Dec 7 14:12:59 2017 +0800 ---------------------------------------------------------------------- .../optimizer/CarbonDecoderOptimizerHelper.scala | 4 ++-- .../apache/spark/util/CarbonReflectionUtils.scala | 18 +++++++++--------- .../spark/sql/hive/CarbonAnalysisRules.scala | 8 ++------ .../spark/sql/hive/CarbonFileMetastore.scala | 7 +++---- .../spark/sql/hive/CarbonPreAggregateRules.scala | 3 ++- .../spark/sql/parser/CarbonSpark2SqlParser.scala | 15 +++------------ 6 files changed, 21 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d19f0185/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala index 886f27c..fee4b66 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala @@ -22,7 +22,7 @@ import java.util import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.sql.SparkSession +import org.apache.spark.SPARK_VERSION import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.util.CarbonReflectionUtils @@ -87,7 +87,7 @@ class CarbonDecoderProcessor { nodeList.add(ArrayCarbonNode(nodeListSeq)) case e: UnaryNode => process(e.child, nodeList) case i: InsertIntoTable => - val version = SparkSession.getActiveSession.get.version + val version = SPARK_VERSION val child: LogicalPlan = if (version.startsWith("2.1")) { CarbonReflectionUtils.getField("child", i).asInstanceOf[LogicalPlan] http://git-wip-us.apache.org/repos/asf/carbondata/blob/d19f0185/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala index 6864495..9a3f28e 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.util import scala.reflect.runtime._ import scala.reflect.runtime.universe._ +import org.apache.spark.SPARK_VERSION import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier @@ -56,20 +57,19 @@ object CarbonReflectionUtils { def getUnresolvedRelation( tableIdentifier: TableIdentifier, - version: String, tableAlias: Option[String] = None): UnresolvedRelation = { val className = "org.apache.spark.sql.catalyst.analysis.UnresolvedRelation" - if (version.startsWith("2.1")) { + if (SPARK_VERSION.startsWith("2.1")) { createObject( className, tableIdentifier, tableAlias)._1.asInstanceOf[UnresolvedRelation] - } else if (version.startsWith("2.2")) { + } else if (SPARK_VERSION.startsWith("2.2")) { createObject( className, tableIdentifier)._1.asInstanceOf[UnresolvedRelation] } else { - throw new UnsupportedOperationException(s"Unsupported Spark version $version") + throw new UnsupportedOperationException(s"Unsupported Spark version $SPARK_VERSION") } } @@ -77,13 +77,13 @@ object CarbonReflectionUtils { relation: LogicalPlan, view: Option[TableIdentifier]): SubqueryAlias = { val className = "org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias" - if (sparkSession.version.startsWith("2.1")) { + if (SPARK_VERSION.startsWith("2.1")) { createObject( className, alias.getOrElse(""), relation, Option(view))._1.asInstanceOf[SubqueryAlias] - } else if (sparkSession.version.startsWith("2.2")) { + } else if (SPARK_VERSION.startsWith("2.2")) { createObject( className, alias.getOrElse(""), @@ -130,7 +130,7 @@ object CarbonReflectionUtils { def getAstBuilder(conf: Object, sqlParser: Object, sparkSession: SparkSession): AstBuilder = { - if (sparkSession.version.startsWith("2.1") || sparkSession.version.startsWith("2.2")) { + if (SPARK_VERSION.startsWith("2.1") || SPARK_VERSION.startsWith("2.2")) { createObject( "org.apache.spark.sql.hive.CarbonSqlAstBuilder", conf, @@ -141,12 +141,12 @@ object CarbonReflectionUtils { } def getSessionState(sparkContext: SparkContext, carbonSession: Object): Any = { - if (sparkContext.version.startsWith("2.1")) { + if (SPARK_VERSION.startsWith("2.1")) { val className = sparkContext.conf.get( CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME, "org.apache.spark.sql.hive.CarbonSessionState") createObject(className, carbonSession)._1 - } else if (sparkContext.version.startsWith("2.2")) { + } else if (SPARK_VERSION.startsWith("2.2")) { val className = sparkContext.conf.get( CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME, "org.apache.spark.sql.hive.CarbonSessionStateBuilder") http://git-wip-us.apache.org/repos/asf/carbondata/blob/d19f0185/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala index 0b3a2b3..b595896 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.hive +import org.apache.spark.SPARK_VERSION import org.apache.spark.sql._ import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation -import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.Inner @@ -127,11 +127,7 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica } else { updatedSelectPlan } - val destinationTable = - CarbonReflectionUtils.getUnresolvedRelation( - table.tableIdentifier, - sparkSession.version, - alias) + val destinationTable = CarbonReflectionUtils.getUnresolvedRelation(table.tableIdentifier, alias) ProjectForUpdate(destinationTable, columns, Seq(finalPlan)) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d19f0185/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 5078259..cdbdb10 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -23,15 +23,14 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.fs.permission.{FsAction, FsPermission} -import org.apache.spark.sql.CarbonDatasourceHadoopRelation +import org.apache.spark.SPARK_VERSION +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonSource, SparkSession} import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias} -import org.apache.spark.sql.CarbonSource import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.SparkSession import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.common.logging.LogServiceFactory @@ -147,7 +146,7 @@ class CarbonFileMetastore extends CarbonMetaStore { case LogicalRelation( carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => carbonDatasourceHadoopRelation.carbonRelation - case SubqueryAlias(_, c: CatalogRelation) if sparkSession.version.startsWith("2.2") => + case SubqueryAlias(_, c: CatalogRelation) if SPARK_VERSION.startsWith("2.2") => val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable( "tableMeta", c).asInstanceOf[CatalogTable] http://git-wip-us.apache.org/repos/asf/carbondata/blob/d19f0185/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index 2b74ed7..426048f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import org.apache.spark.SPARK_VERSION import org.apache.spark.sql._ import org.apache.spark.sql.CarbonExpressions.CarbonSubqueryAlias import org.apache.spark.sql.catalyst.TableIdentifier @@ -1201,7 +1202,7 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi case attr => attr } } - val version = sparkSession.version + val version = SPARK_VERSION val newChild: LogicalPlan = if (newChildOutput == child.output) { if (version.startsWith("2.1")) { CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan] http://git-wip-us.apache.org/repos/asf/carbondata/blob/d19f0185/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index b01c6d9..343db49 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.parser import scala.collection.mutable import scala.language.implicitConversions -import org.apache.spark.sql.{DeleteRecords, SparkSession, UpdateTable} +import org.apache.spark.sql.{DeleteRecords, UpdateTable} import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier} import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -244,12 +244,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { } // Use Reflection to choose between Spark2.1 and Spark2.2 // Move UnresolvedRelation(tableIdentifier, tableAlias) to reflection. - val unresolvedrelation = - CarbonReflectionUtils.getUnresolvedRelation( - tableIdentifier, - SparkSession.getActiveSession.get.version, - tableAlias) - unresolvedrelation + CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, tableAlias) } } @@ -273,11 +268,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { // Use Reflection to choose between Spark2.1 and Spark2.2 // Move (UnresolvedRelation(tableIdent, alias), tableIdent, alias) to reflection. - val unresolvedRelation = - CarbonReflectionUtils.getUnresolvedRelation( - tableIdentifier, - SparkSession.getActiveSession.get.version, - alias) + val unresolvedRelation = CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, alias) (unresolvedRelation, tableIdent, alias, tableIdentifier) }
