Repository: carbondata Updated Branches: refs/heads/master 29dc30280 -> 6b7217a8d
[CARBONDATA-1866] refactored CarbonLateDecodeRule to split different rules This closes #1623 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6b7217a8 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6b7217a8 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6b7217a8 Branch: refs/heads/master Commit: 6b7217a8d47aa2606859b92e4b03a5597b6083c9 Parents: 29dc302 Author: rahulforallp <[email protected]> Authored: Wed Dec 6 15:22:44 2017 +0530 Committer: Venkata Ramana G <[email protected]> Committed: Fri Dec 8 00:57:47 2017 +0530 ---------------------------------------------------------------------- .../spark/sql/optimizer/CarbonIUDRule.scala | 53 ++++++++ .../sql/optimizer/CarbonLateDecodeRule.scala | 133 ++++++------------- .../sql/optimizer/CarbonUDFTransformRule.scala | 68 ++++++++++ .../src/main/spark2.1/CarbonSessionState.scala | 22 ++- .../src/main/spark2.2/CarbonSessionState.scala | 6 +- 5 files changed, 185 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b7217a8/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala new file mode 100644 index 0000000..7300fe9 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.optimizer + +import org.apache.spark.sql.ProjectForUpdate +import org.apache.spark.sql.catalyst.expressions.{NamedExpression, PredicateHelper} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.mutation.CarbonProjectForUpdateCommand + +/** + * Rule specific for IUD operations + */ +class CarbonIUDRule extends Rule[LogicalPlan] with PredicateHelper { + override def apply(plan: LogicalPlan): LogicalPlan = { + processPlan(plan) + } + + private def processPlan(plan: LogicalPlan): LogicalPlan = { + plan transform { + case ProjectForUpdate(table, cols, Seq(updatePlan)) => + var isTransformed = false + val newPlan = updatePlan transform { + case Project(pList, child) if !isTransformed => + val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList + .splitAt(pList.size - cols.size) + val diff = cols.diff(dest.map(_.name.toLowerCase)) + if (diff.nonEmpty) { + sys.error(s"Unknown column(s) ${ diff.mkString(",") } in table ${ table.tableName }") + } + isTransformed = true + Project(dest.filter(a => !cols.contains(a.name.toLowerCase)) ++ source, child) + } + CarbonProjectForUpdateCommand( + newPlan, table.tableIdentifier.database, table.tableIdentifier.table) + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b7217a8/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index 2e39f5e..764891b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -46,35 +46,14 @@ import org.apache.carbondata.spark.CarbonAliasDecoderRelation */ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - private var relations: Seq[CarbonDecoderRelation] = _ - private def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = { - plan collect { - case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - CarbonDecoderRelation(l.attributeMap, - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]) - } - } + private var relations: Seq[CarbonDecoderRelation] = _ def apply(plan: LogicalPlan): LogicalPlan = { - relations = collectCarbonRelation(plan) - if (relations.nonEmpty && !isOptimized(plan)) { - // In case scalar subquery skip the transformation and update the flag. - if (relations.exists(_.carbonRelation.isSubquery.nonEmpty)) { - relations.foreach{carbonDecoderRelation => - if (carbonDecoderRelation.carbonRelation.isSubquery.nonEmpty) { - carbonDecoderRelation.carbonRelation.isSubquery.remove(0) - } - } - LOGGER.info("Skip CarbonOptimizer for scalar/predicate sub query") - return plan - } - LOGGER.info("Starting to optimize plan") - val iudPlan = processPlan(plan) - val udfTransformedPlan = pushDownUDFToJoinLeftRelation(iudPlan) + if (checkIfRuleNeedToBeApplied(plan, true)) { val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("") val queryStatistic = new QueryStatistic() - val result = transformCarbonPlan(udfTransformedPlan, relations) + val result = transformCarbonPlan(plan, relations) queryStatistic.addStatistics("Time taken for Carbon Optimizer to optimize: ", System.currentTimeMillis) recorder.recordStatistics(queryStatistic) @@ -86,62 +65,37 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { } } - private def pushDownUDFToJoinLeftRelation(plan: LogicalPlan): LogicalPlan = { - val output = plan.transform { - case proj@Project(cols, Join( - left, right, jointype: org.apache.spark.sql.catalyst.plans.JoinType, condition)) => - var projectionToBeAdded: Seq[org.apache.spark.sql.catalyst.expressions.Alias] = Seq.empty - var udfExists = false - val newCols = cols.map { - case a@Alias(s: ScalaUDF, name) - if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) || - name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) => - udfExists = true - projectionToBeAdded :+= a - AttributeReference(name, StringType, nullable = true)().withExprId(a.exprId) - case other => other - } - if (udfExists) { - val newLeft = left match { - case Project(columns, logicalPlan) => - Project(columns ++ projectionToBeAdded, logicalPlan) - case filter: Filter => - Project(filter.output ++ projectionToBeAdded, filter) - case relation: LogicalRelation => - Project(relation.output ++ projectionToBeAdded, relation) - case other => other - } - Project(newCols, Join(newLeft, right, jointype, condition)) - } else { - proj + def checkIfRuleNeedToBeApplied(plan: LogicalPlan, removeSubQuery: Boolean = false): Boolean = { + relations = collectCarbonRelation(plan); + if (relations.nonEmpty && !isOptimized(plan)) { + // In case scalar subquery skip the transformation and update the flag. + if (relations.exists(_.carbonRelation.isSubquery.nonEmpty)) { + if (removeSubQuery) { + relations.foreach { carbonDecoderRelation => + if (carbonDecoderRelation.carbonRelation.isSubquery.nonEmpty) { + carbonDecoderRelation.carbonRelation.isSubquery.remove(0) + } + } } - case other => other + LOGGER.info("skip CarbonOptimizer for scalar/predicate sub query") + return false + } + true + } else { + LOGGER.info("skip CarbonOptimizer") + false } - output } - private def processPlan(plan: LogicalPlan): LogicalPlan = { - plan transform { - case ProjectForUpdate(table, cols, Seq(updatePlan)) => - var isTransformed = false - val newPlan = updatePlan transform { - case Project(pList, child) if !isTransformed => - val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList - .splitAt(pList.size - cols.size) - val diff = cols.diff(dest.map(_.name.toLowerCase)) - if (diff.nonEmpty) { - sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}") - } - isTransformed = true - Project(dest.filter(a => !cols.contains(a.name.toLowerCase)) ++ source, child) - } - CarbonProjectForUpdateCommand( - newPlan, table.tableIdentifier.database, table.tableIdentifier.table) + private def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = { + plan collect { + case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + CarbonDecoderRelation(l.attributeMap, + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]) } } - - def isOptimized(plan: LogicalPlan): Boolean = { + private def isOptimized(plan: LogicalPlan): Boolean = { plan find { case cd: CarbonDictionaryCatalystDecoder => true case other => false @@ -150,7 +104,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { case class ExtraNodeInfo(var hasCarbonRelation: Boolean) - def fillNodeInfo( + private def fillNodeInfo( plan: LogicalPlan, extraNodeInfos: java.util.HashMap[LogicalPlan, ExtraNodeInfo]): ExtraNodeInfo = { plan match { @@ -573,6 +527,22 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { updateProjection(updateTempDecoder(transFormedPlan, aliasMap, attrMap)) } + private def isDictionaryEncoded(attribute: Attribute, + attributeMap: util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation], + aliasMap: CarbonAliasDecoderRelation): Boolean = { + + val uattr = aliasMap.getOrElse(attribute, attribute) + val relation = Option(attributeMap.get(AttributeReferenceWrapper(uattr))) + if (relation.isDefined) { + relation.get.dictionaryMap.get(uattr.name) match { + case Some(true) => true + case _ => false + } + } else { + false + } + } + private def updateTempDecoder(plan: LogicalPlan, aliasMapOriginal: CarbonAliasDecoderRelation, attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation]): @@ -818,21 +788,6 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { } } - private def isDictionaryEncoded(attr: Attribute, - attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation], - aliasMap: CarbonAliasDecoderRelation): Boolean = { - val uAttr = aliasMap.getOrElse(attr, attr) - val relation = Option(attrMap.get(AttributeReferenceWrapper(uAttr))) - if (relation.isDefined) { - relation.get.dictionaryMap.get(uAttr.name) match { - case Some(true) => true - case _ => false - } - } else { - false - } - } - def qualifierPresence(plan: LogicalPlan, attr: Attribute): Boolean = { var present = false plan collect { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b7217a8/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonUDFTransformRule.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonUDFTransformRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonUDFTransformRule.scala new file mode 100644 index 0000000..c080cd9 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonUDFTransformRule.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, PredicateHelper, +ScalaUDF} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types.StringType + +import org.apache.carbondata.core.constants.CarbonCommonConstants + +class CarbonUDFTransformRule extends Rule[LogicalPlan] with PredicateHelper { + override def apply(plan: LogicalPlan): LogicalPlan = { + pushDownUDFToJoinLeftRelation(plan) + } + + private def pushDownUDFToJoinLeftRelation(plan: LogicalPlan): LogicalPlan = { + val output = plan.transform { + case proj@Project(cols, Join( + left, right, jointype: org.apache.spark.sql.catalyst.plans.JoinType, condition)) => + var projectionToBeAdded: Seq[org.apache.spark.sql.catalyst.expressions.Alias] = Seq.empty + var udfExists = false + val newCols = cols.map { + case a@Alias(s: ScalaUDF, name) + if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) || + name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) => + udfExists = true + projectionToBeAdded :+= a + AttributeReference(name, StringType, nullable = true)().withExprId(a.exprId) + case other => other + } + if (udfExists) { + val newLeft = left match { + case Project(columns, logicalPlan) => + Project(columns ++ projectionToBeAdded, logicalPlan) + case filter: Filter => + Project(filter.output ++ projectionToBeAdded, filter) + case relation: LogicalRelation => + Project(relation.output ++ projectionToBeAdded, relation) + case other => other + } + Project(newCols, Join(newLeft, right, jointype, condition)) + } else { + proj + } + case other => other + } + output + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b7217a8/integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala index 7113e63..8900847 100644 --- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.hive import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession} -import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery} @@ -28,12 +26,14 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} +import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} +import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.optimizer.CarbonLateDecodeRule +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy} import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier @@ -144,13 +144,23 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) - experimentalMethods.extraStrategies = + experimentalMethods.extraStrategies = extraStrategies + + experimentalMethods.extraOptimizations = extraOptimizations + + def extraStrategies: Seq[Strategy] = { Seq( new StreamingTableStrategy(sparkSession), new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession) ) - experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule) + } + + def extraOptimizations: Seq[Rule[LogicalPlan]] = { + Seq(new CarbonIUDRule, + new CarbonUDFTransformRule, + new CarbonLateDecodeRule) + } override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods) http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b7217a8/integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala index e10feb1..2ba6d09 100644 --- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStr import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.{SQLConf, SessionState} -import org.apache.spark.sql.optimizer.CarbonLateDecodeRule +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser} import org.apache.spark.sql.types.DecimalType @@ -170,7 +170,9 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession, new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession) ) - experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule) + experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule, + new CarbonUDFTransformRule, + new CarbonLateDecodeRule) /** * Internal catalog for managing table and database states.
