[
https://issues.apache.org/jira/browse/CARBONDATA-50?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15381349#comment-15381349
]
ASF GitHub Bot commented on CARBONDATA-50:
------------------------------------------
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/35#discussion_r71082582
--- Diff:
integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
---
@@ -37,427 +36,461 @@ import org.carbondata.spark.CarbonFilters
/**
* Carbon Optimizer to add dictionary decoder.
*/
-class CarbonOptimizer(optimizer: Optimizer, conf: CatalystConf)
- extends Optimizer with PredicateHelper {
+object CarbonOptimizer {
- val batches = Nil
+ def optimizer(optimizer: Optimizer, conf: CarbonSQLConf, version:
String): Optimizer = {
+
CodeGenerateFactory.getInstance().optimizerFactory.createOptimizer(optimizer,
conf)
+ }
- override def execute(plan: LogicalPlan): LogicalPlan = {
+ def execute(plan: LogicalPlan, optimizer: Optimizer): LogicalPlan = {
val executedPlan: LogicalPlan = optimizer.execute(plan)
- val relations = collectCarbonRelation(plan)
+ val relations = CarbonOptimizer.collectCarbonRelation(plan)
if (relations.nonEmpty) {
- new ResolveCarbonFunctions(relations)(executedPlan)
+ new ResolveCarbonFunctions(relations).apply(executedPlan)
} else {
executedPlan
}
}
- /**
- * It does two jobs. 1. Change the datatype for dictionary encoded
column 2. Add the dictionary
- * decoder plan.
- */
- class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
extends
- Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = {
- transformCarbonPlan(plan, relations)
+ // get the carbon relation from plan.
+ def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation]
= {
+ plan collect {
+ case l: LogicalRelation if
l.relation.isInstanceOf[CarbonDatasourceRelation] =>
+ CarbonDecoderRelation(l.attributeMap,
l.relation.asInstanceOf[CarbonDatasourceRelation])
}
+ }
+}
+
+/**
+ * It does two jobs. 1. Change the datatype for dictionary encoded column
2. Add the dictionary
+ * decoder plan.
+ */
+class ResolveCarbonFunctions(
+ relations: Seq[CarbonDecoderRelation])
+ extends Rule[LogicalPlan] with PredicateHelper {
- /**
- * Steps for changing the plan.
- * 1. It finds out the join condition columns and dimension aggregate
columns which are need to
- * be decoded just before that plan executes.
- * 2. Plan starts transform by adding the decoder to the plan where it
needs the decoded data
- * like dimension aggregate columns decoder under aggregator and join
condition decoder under
- * join children.
- */
- def transformCarbonPlan(plan: LogicalPlan,
- relations: Seq[CarbonDecoderRelation]): LogicalPlan = {
- var decoder = false
- val aliasMap = CarbonAliasDecoderRelation()
- // collect alias information before hand.
- collectInformationOnAttributes(plan, aliasMap)
- val transFormedPlan =
- plan transformDown {
- case cd: CarbonDictionaryTempDecoder if cd.isOuter =>
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ transformCarbonPlan(plan, relations)
+ }
+
+ /**
+ * Steps for changing the plan.
+ * 1. It finds out the join condition columns and dimension aggregate
columns which are need to
+ * be decoded just before that plan executes.
+ * 2. Plan starts transform by adding the decoder to the plan where it
needs the decoded data
+ * like dimension aggregate columns decoder under aggregator and join
condition decoder under
+ * join children.
+ */
+ def transformCarbonPlan(plan: LogicalPlan,
+ relations: Seq[CarbonDecoderRelation]): LogicalPlan = {
+ var decoder = false
+ val aliasMap = CarbonAliasDecoderRelation()
+ // collect alias information before hand.
+ collectInformationOnAttributes(plan, aliasMap)
+ val transFormedPlan =
+ plan transformDown {
+ case cd: CarbonDictionaryTempDecoder if cd.isOuter =>
+ decoder = true
+ cd
+ case sort: Sort if
!sort.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
+ val attrsOnSort = new util.HashSet[Attribute]()
+ sort.order.map { s =>
+ s.collect {
+ case attr: AttributeReference
+ if isDictionaryEncoded(attr, relations, aliasMap) =>
+ attrsOnSort.add(aliasMap.getOrElse(attr, attr))
+ }
+ }
+ var child = sort.child
+ if (attrsOnSort.size() > 0 && !child.isInstanceOf[Sort]) {
+ child = CarbonDictionaryTempDecoder(attrsOnSort,
+ new util.HashSet[Attribute](), sort.child)
+ }
+ if (!decoder) {
decoder = true
- cd
- case sort: Sort if
!sort.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- val attrsOnSort = new util.HashSet[Attribute]()
- sort.order.map { s =>
- s.collect {
+ CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
+ new util.HashSet[Attribute](),
+ Sort(sort.order, sort.global, child),
+ isOuter = true)
+ } else {
+ Sort(sort.order, sort.global, child)
+ }
+
+ case agg: Aggregate if
!agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
+ val attrsOndimAggs = new util.HashSet[Attribute]
+ agg.aggregateExpressions.map {
+ case attr: AttributeReference =>
+ case a@Alias(attr: AttributeReference, name) =>
aliasMap.put(a.toAttribute, attr)
+ case aggExp: AggregateExpression =>
+ aggExp.transform {
+ case aggExp: AggregateExpression =>
+ collectDimensionAggregates(aggExp, attrsOndimAggs,
aliasMap)
+ aggExp
+ case a@Alias(attr: Attribute, name) =>
+ aliasMap.put(a.toAttribute, attr)
+ a
+ }
+ case others =>
+ others.collect {
+ case a@ Alias(attr: AttributeReference, _) =>
aliasMap.put(a.toAttribute, attr)
+ case a@Alias(exp, _) if
!exp.isInstanceOf[AttributeReference] =>
+ aliasMap.put(a.toAttribute, new AttributeReference("",
StringType)())
case attr: AttributeReference
if isDictionaryEncoded(attr, relations, aliasMap) =>
- attrsOnSort.add(aliasMap.getOrElse(attr, attr))
+ attrsOndimAggs.add(aliasMap.getOrElse(attr, attr))
}
- }
- var child = sort.child
- if (attrsOnSort.size() > 0 && !child.isInstanceOf[Sort]) {
- child = CarbonDictionaryTempDecoder(attrsOnSort,
- new util.HashSet[Attribute](), sort.child)
- }
- if (!decoder) {
- decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
- new util.HashSet[Attribute](),
- Sort(sort.order, sort.global, child),
- isOuter = true)
- } else {
- Sort(sort.order, sort.global, child)
- }
-
- case agg: Aggregate if
!agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- val attrsOndimAggs = new util.HashSet[Attribute]
- agg.aggregateExpressions.map {
+ }
+ var child = agg.child
+ // Incase if the child also aggregate then push down decoder to
child
+ if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
+ child = CarbonDictionaryTempDecoder(attrsOndimAggs,
+ new util.HashSet[Attribute](),
+ agg.child)
+ }
+ if (!decoder) {
+ decoder = true
+ CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
+ new util.HashSet[Attribute](),
+ Aggregate(agg.groupingExpressions, agg.aggregateExpressions,
child),
+ isOuter = true)
+ } else {
+ Aggregate(agg.groupingExpressions, agg.aggregateExpressions,
child)
+ }
+ case expand: Expand if
!expand.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
+ val attrsOnExpand = new util.HashSet[Attribute]
+ expand.projections.map {s =>
+ s.map {
case attr: AttributeReference =>
case a@Alias(attr: AttributeReference, name) =>
aliasMap.put(a.toAttribute, attr)
- case aggExp: AggregateExpression =>
- aggExp.transform {
- case aggExp: AggregateExpression =>
- collectDimensionAggregates(aggExp, attrsOndimAggs,
aliasMap)
- aggExp
- case a@Alias(attr: Attribute, name) =>
- aliasMap.put(a.toAttribute, attr)
- a
- }
case others =>
others.collect {
case attr: AttributeReference
if isDictionaryEncoded(attr, relations, aliasMap) =>
- attrsOndimAggs.add(aliasMap.getOrElse(attr, attr))
+ attrsOnExpand.add(aliasMap.getOrElse(attr, attr))
}
}
- var child = agg.child
- // Incase if the child also aggregate then push down decoder
to child
- if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
- child = CarbonDictionaryTempDecoder(attrsOndimAggs,
- new util.HashSet[Attribute](),
- agg.child)
- }
- if (!decoder) {
- decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
- new util.HashSet[Attribute](),
- Aggregate(agg.groupingExpressions,
agg.aggregateExpressions, child),
- isOuter = true)
- } else {
- Aggregate(agg.groupingExpressions, agg.aggregateExpressions,
child)
- }
-
- case filter: Filter if
!filter.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- val attrsOnConds = new util.HashSet[Attribute]
- CarbonFilters
- .selectFilters(splitConjunctivePredicates(filter.condition),
attrsOnConds, aliasMap)
-
- var child = filter.child
- if (attrsOnConds.size() > 0 && !child.isInstanceOf[Filter]) {
- child = CarbonDictionaryTempDecoder(attrsOnConds,
- new util.HashSet[Attribute](),
- filter.child)
- }
+ }
+ var child = expand.child
+ if (attrsOnExpand.size() > 0 && !child.isInstanceOf[Expand]) {
+ child = CarbonDictionaryTempDecoder(attrsOnExpand,
+ new util.HashSet[Attribute](),
+ expand.child)
+ }
+ if (!decoder) {
+ decoder = true
+ CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
+ new util.HashSet[Attribute](),
+
CodeGenerateFactory.getInstance().expandFactory.createExpand(expand, child),
+ isOuter = true)
+ } else {
+
CodeGenerateFactory.getInstance().expandFactory.createExpand(expand, child)
+ }
+ case filter: Filter if
!filter.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
+ val attrsOnConds = new util.HashSet[Attribute]
+ CarbonFilters
+ .selectFilters(splitConjunctivePredicates(filter.condition),
attrsOnConds, aliasMap)
+
+ var child = filter.child
+ if (attrsOnConds.size() > 0 && !child.isInstanceOf[Filter]) {
+ child = CarbonDictionaryTempDecoder(attrsOnConds,
+ new util.HashSet[Attribute](),
+ filter.child)
+ }
- if (!decoder) {
- decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
- new util.HashSet[Attribute](),
- Filter(filter.condition, child),
- isOuter = true)
- } else {
- Filter(filter.condition, child)
- }
+ if (!decoder) {
+ decoder = true
+ CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
+ new util.HashSet[Attribute](),
+ Filter(filter.condition, child),
+ isOuter = true)
+ } else {
+ Filter(filter.condition, child)
+ }
- case j: Join
+ case j: Join
if !(j.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
j.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
- val attrsOnJoin = new util.HashSet[Attribute]
- j.condition match {
- case Some(expression) =>
- expression.collect {
- case attr: AttributeReference
- if isDictionaryEncoded(attr, relations, aliasMap) =>
- attrsOnJoin.add(aliasMap.getOrElse(attr, attr))
- }
- case _ =>
- }
+ val attrsOnJoin = new util.HashSet[Attribute]
+ j.condition match {
+ case Some(expression) =>
+ expression.collect {
+ case attr: AttributeReference
+ if isDictionaryEncoded(attr, relations, aliasMap) =>
+ attrsOnJoin.add(aliasMap.getOrElse(attr, attr))
+ }
+ case _ =>
+ }
- val leftCondAttrs = new util.HashSet[Attribute]
- val rightCondAttrs = new util.HashSet[Attribute]
- if (attrsOnJoin.size() > 0) {
+ val leftCondAttrs = new util.HashSet[Attribute]
+ val rightCondAttrs = new util.HashSet[Attribute]
+ if (attrsOnJoin.size() > 0) {
- attrsOnJoin.asScala.map { attr =>
- if (qualifierPresence(j.left, attr)) {
- leftCondAttrs.add(attr)
- }
- if (qualifierPresence(j.right, attr)) {
- rightCondAttrs.add(attr)
- }
- }
- var leftPlan = j.left
- var rightPlan = j.right
- if (leftCondAttrs.size() > 0 &&
- !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder])
{
- leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
- new util.HashSet[Attribute](),
- j.left)
- }
- if (rightCondAttrs.size() > 0 &&
-
!rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
- rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
- new util.HashSet[Attribute](),
- j.right)
+ attrsOnJoin.asScala.map { attr =>
+ if (qualifierPresence(j.left, attr)) {
+ leftCondAttrs.add(attr)
}
- if (!decoder) {
- decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
- new util.HashSet[Attribute](),
- Join(leftPlan, rightPlan, j.joinType, j.condition),
- isOuter = true)
- } else {
- Join(leftPlan, rightPlan, j.joinType, j.condition)
+ if (qualifierPresence(j.right, attr)) {
+ rightCondAttrs.add(attr)
}
- } else {
- j
}
-
- case p: Project
- if relations.nonEmpty &&
!p.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- val attrsOnProjects = new util.HashSet[Attribute]
- p.projectList.map {
- case attr: AttributeReference =>
- case a@Alias(attr: AttributeReference, name) =>
aliasMap.put(a.toAttribute, attr)
- case others =>
- others.collect {
- case attr: AttributeReference
- if isDictionaryEncoded(attr, relations, aliasMap) =>
- attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
- }
+ var leftPlan = j.left
+ var rightPlan = j.right
+ if (leftCondAttrs.size() > 0 &&
+ !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
+ leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
+ new util.HashSet[Attribute](),
+ j.left)
}
- var child = p.child
- if (attrsOnProjects.size() > 0 &&
!child.isInstanceOf[Project]) {
- child = CarbonDictionaryTempDecoder(attrsOnProjects,
+ if (rightCondAttrs.size() > 0 &&
+ !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
+ rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
new util.HashSet[Attribute](),
- p.child)
+ j.right)
}
if (!decoder) {
decoder = true
CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
new util.HashSet[Attribute](),
- Project(p.projectList, child),
+ Join(leftPlan, rightPlan, j.joinType, j.condition),
isOuter = true)
} else {
- Project(p.projectList, child)
- }
-
- case l@LogicalRelation(carbonRelation: CarbonDatasourceRelation,
_) =>
- if (!decoder) {
- decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
- new util.HashSet[Attribute](), l, isOuter = true)
- } else {
- l
+ Join(leftPlan, rightPlan, j.joinType, j.condition)
}
-
- case others => others
- }
-
- val processor = new CarbonDecoderProcessor
- processor.updateDecoders(processor.getDecoderList(transFormedPlan))
- updateProjection(updateTempDecoder(transFormedPlan, aliasMap))
- }
-
- private def updateTempDecoder(plan: LogicalPlan,
- aliasMap: CarbonAliasDecoderRelation): LogicalPlan = {
- var allAttrsNotDecode: util.Set[Attribute] = new
util.HashSet[Attribute]()
- val marker = new CarbonPlanMarker
- plan transformDown {
- case cd: CarbonDictionaryTempDecoder if !cd.processed =>
- cd.processed = true
- allAttrsNotDecode = cd.attrsNotDecode
- marker.pushMarker(cd.attrsNotDecode)
- if (cd.isOuter) {
- CarbonDictionaryCatalystDecoder(relations,
- ExcludeProfile(cd.attrsNotDecode.asScala.toSeq),
- aliasMap,
- isOuter = true,
- cd.child)
} else {
- CarbonDictionaryCatalystDecoder(relations,
- IncludeProfile(cd.attrList.asScala.toSeq),
- aliasMap,
- isOuter = false,
- cd.child)
+ j
}
- case cd: CarbonDictionaryCatalystDecoder =>
- cd
- case sort: Sort =>
- val sortExprs = sort.order.map { s =>
- s.transform {
- case attr: AttributeReference =>
- updateDataType(attr, relations, allAttrsNotDecode,
aliasMap)
- }.asInstanceOf[SortOrder]
- }
- Sort(sortExprs, sort.global, sort.child)
- case agg: Aggregate if
!agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] =>
- val aggExps = agg.aggregateExpressions.map { aggExp =>
- aggExp.transform {
- case attr: AttributeReference =>
- updateDataType(attr, relations, allAttrsNotDecode,
aliasMap)
- }
- }.asInstanceOf[Seq[NamedExpression]]
- val grpExps = agg.groupingExpressions.map { gexp =>
- gexp.transform {
- case attr: AttributeReference =>
- updateDataType(attr, relations, allAttrsNotDecode,
aliasMap)
- }
- }
- Aggregate(grpExps, aggExps, agg.child)
- case filter: Filter =>
- val filterExps = filter.condition transform {
+ case p: Project
+ if relations.nonEmpty &&
!p.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
+ val attrsOnProjects = new util.HashSet[Attribute]
+ p.projectList.map {
case attr: AttributeReference =>
- updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
- case l: Literal => FakeCarbonCast(l, l.dataType)
+ case a@Alias(attr: AttributeReference, name) =>
aliasMap.put(a.toAttribute, attr)
+ case others =>
+ others.collect {
+ case attr: AttributeReference
+ if isDictionaryEncoded(attr, relations, aliasMap) =>
+ attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
+ }
}
- Filter(filterExps, filter.child)
- case j: Join =>
- marker.pushJoinMarker(allAttrsNotDecode)
- j
- case p: Project if relations.nonEmpty =>
- val prExps = p.projectList.map { prExp =>
- prExp.transform {
- case attr: AttributeReference =>
- updateDataType(attr, relations, allAttrsNotDecode,
aliasMap)
- }
- }.asInstanceOf[Seq[NamedExpression]]
- Project(prExps, p.child)
- case l@LogicalRelation(carbonRelation: CarbonDatasourceRelation,
_) =>
- allAttrsNotDecode = marker.revokeJoin()
- l
- case others => others
- }
- }
-
- private def updateProjection(plan: LogicalPlan): LogicalPlan = {
- val transFormedPlan = plan transform {
- case p@Project(projectList: Seq[NamedExpression], cd:
CarbonDictionaryCatalystDecoder) =>
- if (cd.child.isInstanceOf[Filter] ||
cd.child.isInstanceOf[LogicalRelation]) {
- Project(projectList: Seq[NamedExpression], cd.child)
+ var child = p.child
+ if (attrsOnProjects.size() > 0 && !child.isInstanceOf[Project]) {
+ child = CarbonDictionaryTempDecoder(attrsOnProjects,
+ new util.HashSet[Attribute](),
+ p.child)
+ }
+ if (!decoder) {
+ decoder = true
+ CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
+ new util.HashSet[Attribute](),
+ Project(p.projectList, child),
+ isOuter = true)
} else {
- p
+ Project(p.projectList, child)
}
- case f@Filter(condition: Expression, cd:
CarbonDictionaryCatalystDecoder) =>
- if (cd.child.isInstanceOf[Project] ||
cd.child.isInstanceOf[LogicalRelation]) {
- Filter(condition, cd.child)
+
+ case l: LogicalRelation if
l.relation.isInstanceOf[CarbonDatasourceRelation] =>
+ if (!decoder) {
+ decoder = true
+ CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
+ new util.HashSet[Attribute](), l, isOuter = true)
} else {
- f
+ l
}
+
+ case others => others
}
- // Remove unnecessary decoders
- val finalPlan = transFormedPlan transform {
- case CarbonDictionaryCatalystDecoder(_, profile, _, false, child)
- if profile.isInstanceOf[IncludeProfile] && profile.isEmpty =>
child
- }
- finalPlan
- }
- private def collectInformationOnAttributes(plan: LogicalPlan,
- aliasMap: CarbonAliasDecoderRelation) {
- plan transformUp {
- case project: Project =>
- project.projectList.map { p =>
- p transform {
- case a@Alias(attr: Attribute, name) =>
- aliasMap.put(a.toAttribute, attr)
- a
- case a@Alias(_, name) =>
- aliasMap.put(a.toAttribute, new AttributeReference("",
StringType)())
- a
- }
+ val processor = new CarbonDecoderProcessor
+ processor.updateDecoders(processor.getDecoderList(transFormedPlan))
+ updateProjection(updateTempDecoder(transFormedPlan, aliasMap))
+ }
+
+ private def updateTempDecoder(plan: LogicalPlan,
+ aliasMap: CarbonAliasDecoderRelation): LogicalPlan = {
+ var allAttrsNotDecode: util.Set[Attribute] = new
util.HashSet[Attribute]()
+ val marker = new CarbonPlanMarker
+ plan transformDown {
+ case cd: CarbonDictionaryTempDecoder if !cd.processed =>
+ cd.processed = true
+ allAttrsNotDecode = cd.attrsNotDecode
+ marker.pushMarker(cd.attrsNotDecode)
+ if (cd.isOuter) {
+ CarbonDictionaryCatalystDecoder(relations,
+ ExcludeProfile(cd.attrsNotDecode.asScala.toSeq),
+ aliasMap,
+ isOuter = true,
+ cd.child)
+ } else {
+ CarbonDictionaryCatalystDecoder(relations,
+ IncludeProfile(cd.attrList.asScala.toSeq),
+ aliasMap,
+ isOuter = false,
+ cd.child)
+ }
+ case cd: CarbonDictionaryCatalystDecoder =>
+ cd
+ case sort: Sort =>
+ val sortExprs = sort.order.map { s =>
+ s.transform {
+ case attr: AttributeReference =>
+ updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
+ }.asInstanceOf[SortOrder]
+ }
+ Sort(sortExprs, sort.global, sort.child)
+ case agg: Aggregate if
!agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] =>
+ val aggExps = agg.aggregateExpressions.map { aggExp =>
+ aggExp.transform {
+ case attr: AttributeReference =>
+ updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
}
- project
- case agg: Aggregate =>
- agg.aggregateExpressions.map { aggExp =>
- aggExp.transform {
- case a@Alias(attr: Attribute, name) =>
- aliasMap.put(a.toAttribute, attr)
- a
- case a@Alias(_, name) =>
- aliasMap.put(a.toAttribute, new AttributeReference("",
StringType)())
- a
- }
+ }.asInstanceOf[Seq[NamedExpression]]
+
+ val grpExps = agg.groupingExpressions.map { gexp =>
+ gexp.transform {
+ case attr: AttributeReference =>
+ updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
}
- agg
- }
+ }
+ Aggregate(grpExps, aggExps, agg.child)
+ case filter: Filter =>
+ val filterExps = filter.condition transform {
+ case attr: AttributeReference =>
+ updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
+ }
+ Filter(filterExps, filter.child)
+ case j: Join =>
+ marker.pushJoinMarker(allAttrsNotDecode)
+ j
+ case p: Project if relations.nonEmpty =>
+ val prExps = p.projectList.map { prExp =>
+ prExp.transform {
+ case attr: AttributeReference =>
+ updateDataType(attr, relations, allAttrsNotDecode, aliasMap)
+ }
+ }.asInstanceOf[Seq[NamedExpression]]
+ Project(prExps, p.child)
+ case l: LogicalRelation if
l.relation.isInstanceOf[CarbonDatasourceRelation] =>
+ allAttrsNotDecode = marker.revokeJoin()
+ l
+ case others => others
}
+ }
- // Collect aggregates on dimensions so that we can add decoder to it.
- private def collectDimensionAggregates(aggExp: AggregateExpression,
- attrsOndimAggs: util.HashSet[Attribute],
- aliasMap: CarbonAliasDecoderRelation) {
- aggExp collect {
- case attr: AttributeReference if isDictionaryEncoded(attr,
relations, aliasMap) =>
- attrsOndimAggs.add(aliasMap.getOrElse(attr, attr))
- case a@Alias(attr: Attribute, name) => aliasMap.put(a.toAttribute,
attr)
- }
+ private def updateProjection(plan: LogicalPlan): LogicalPlan = {
+ val transFormedPlan = plan transform {
+ case p@Project(projectList: Seq[NamedExpression], cd:
CarbonDictionaryCatalystDecoder) =>
+ if (cd.child.isInstanceOf[Filter] ||
cd.child.isInstanceOf[LogicalRelation]) {
+ Project(projectList: Seq[NamedExpression], cd.child)
+ } else {
+ p
+ }
+ case f@Filter(condition: Expression, cd:
CarbonDictionaryCatalystDecoder) =>
+ if (cd.child.isInstanceOf[Project] ||
cd.child.isInstanceOf[LogicalRelation]) {
+ Filter(condition, cd.child)
+ } else {
+ f
+ }
}
+ // Remove unnecessary decoders
+ val finalPlan = transFormedPlan transform {
+ case CarbonDictionaryCatalystDecoder(_, profile, _, false, child)
+ if profile.isInstanceOf[IncludeProfile] && profile.isEmpty =>
+ child
+ }
+ finalPlan
+ }
- /**
- * Update the attribute datatype with [IntegerType] if the carbon
column is encoded with
- * dictionary.
- *
- */
- private def updateDataType(attr: Attribute,
- relations: Seq[CarbonDecoderRelation],
- allAttrsNotDecode: util.Set[Attribute],
- aliasMap: CarbonAliasDecoderRelation) = {
- val uAttr = aliasMap.getOrElse(attr, attr)
- val relation = relations.find(p => p.contains(uAttr))
- if (relation.isDefined) {
-
relation.get.carbonRelation.carbonRelation.metaData.dictionaryMap.get(uAttr.name)
match {
- case Some(true) if !allAttrsNotDecode.asScala.exists(p =>
p.name.equals(uAttr.name)) =>
- val newAttr = AttributeReference(attr.name,
- IntegerType,
- attr.nullable,
- attr.metadata)(attr.exprId, attr.qualifiers)
- relation.get.addAttribute(newAttr)
- newAttr
- case _ => attr
+ private def collectInformationOnAttributes(plan: LogicalPlan,
+ aliasMap: CarbonAliasDecoderRelation) {
+ plan transformUp {
+ case project: Project =>
+ project.projectList.map { p =>
+ p transform {
+ case a@Alias(attr: Attribute, name) =>
+ aliasMap.put(a.toAttribute, attr)
+ a
+ case a@Alias(_, name) =>
+ aliasMap.put(a.toAttribute, new AttributeReference("",
StringType)())
+ a
+ }
}
- } else {
- attr
- }
+ project
+ case agg: Aggregate =>
+ agg.aggregateExpressions.map { aggExp =>
+ aggExp.transform {
+ case a@Alias(attr: Attribute, name) =>
+ aliasMap.put(a.toAttribute, attr)
+ a
+ case a@Alias(_, name) =>
+ aliasMap.put(a.toAttribute, new AttributeReference("",
StringType)())
+ a
+ }
+ }
+ agg
}
+ }
- private def isDictionaryEncoded(attr: Attribute,
- relations: Seq[CarbonDecoderRelation],
- aliasMap: CarbonAliasDecoderRelation): Boolean = {
- val uAttr = aliasMap.getOrElse(attr, attr)
- val relation = relations.find(p => p.contains(uAttr))
- if (relation.isDefined) {
-
relation.get.carbonRelation.carbonRelation.metaData.dictionaryMap.get(uAttr.name)
match {
- case Some(true) => true
- case _ => false
- }
- } else {
- false
+ // Collect aggregates on dimensions so that we can add decoder to it.
+ private def collectDimensionAggregates(aggExp: AggregateExpression,
+ attrsOndimAggs: util.HashSet[Attribute],
+ aliasMap: CarbonAliasDecoderRelation) {
+ aggExp collect {
+ case attr: AttributeReference if isDictionaryEncoded(attr,
relations, aliasMap) =>
+ attrsOndimAggs.add(aliasMap.getOrElse(attr, attr))
+ case a@Alias(attr: Attribute, name) => aliasMap.put(a.toAttribute,
attr)
+ }
+ }
+
+ /**
+ * Update the attribute datatype with [IntegerType] if the carbon column
is encoded with
+ * dictionary.
+ *
+ */
+ private def updateDataType(attr: Attribute,
+ relations: Seq[CarbonDecoderRelation],
+ allAttrsNotDecode: util.Set[Attribute],
+ aliasMap: CarbonAliasDecoderRelation) = {
--- End diff --
ok
> Support Spark 1.6.2 in CarbonData
> ---------------------------------
>
> Key: CARBONDATA-50
> URL: https://issues.apache.org/jira/browse/CARBONDATA-50
> Project: CarbonData
> Issue Type: New Feature
> Reporter: Ravindra Pesala
> Assignee: Ravindra Pesala
>
> Current carbon cannot support latest Spark version 1.6.2. It should be
> supported with new maven profile like below
> {code}
> mvn clean -Pspark-1.6.2 package
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)