http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala deleted file mode 100644 index 914203f..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala +++ /dev/null @@ -1,862 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.optimizer - -import java.util - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.command.ProjectForUpdateCommand -import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.types.{IntegerType, StringType} - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.stats.QueryStatistic -import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory -import org.apache.carbondata.spark.{CarbonAliasDecoderRelation, CarbonFilters} - -/** - * Carbon Optimizer to add dictionary decoder. - */ -object CarbonOptimizer { - - def optimizer(optimizer: Optimizer, conf: CarbonSQLConf, version: String): Optimizer = { - CodeGenerateFactory.getInstance().optimizerFactory.createOptimizer(optimizer, conf) - } - - def execute(plan: LogicalPlan, optimizer: Optimizer): LogicalPlan = { - val executedPlan: LogicalPlan = optimizer.execute(plan) - val relations = CarbonOptimizer.collectCarbonRelation(plan) - if (relations.nonEmpty) { - new ResolveCarbonFunctions(relations).apply(executedPlan) - } else { - executedPlan - } - } - - // get the carbon relation from plan. - def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = { - plan collect { - case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] => - CarbonDecoderRelation(l.attributeMap, l.relation.asInstanceOf[CarbonDatasourceRelation]) - } - } -} - -/** - * It does two jobs. 1. Change the datatype for dictionary encoded column 2. Add the dictionary - * decoder plan. - */ -class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation]) - extends Rule[LogicalPlan] with PredicateHelper { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - def apply(logicalPlan: LogicalPlan): LogicalPlan = { - if (relations.nonEmpty && !isOptimized(logicalPlan)) { - val plan = processPlan(logicalPlan) - val udfTransformedPlan = pushDownUDFToJoinLeftRelation(plan) - LOGGER.info("Starting to optimize plan") - val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("") - val queryStatistic = new QueryStatistic() - val result = transformCarbonPlan(udfTransformedPlan, relations) - queryStatistic.addStatistics("Time taken for Carbon Optimizer to optimize: ", - System.currentTimeMillis) - recorder.recordStatistics(queryStatistic) - recorder.logStatistics() - result - } else { - LOGGER.info("Skip CarbonOptimizer") - logicalPlan - } - } - - private def processPlan(plan: LogicalPlan): LogicalPlan = { - plan transform { - case ProjectForUpdate(table, cols, Seq(updatePlan)) => - var isTransformed = false - val newPlan = updatePlan transform { - case Project(pList, child) if (!isTransformed) => - val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList - .splitAt(pList.size - cols.size) - val diff = cols.diff(dest.map(_.name)) - if (diff.size > 0) { - sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}") - } - isTransformed = true - Project(dest.filter(a => !cols.contains(a.name)) ++ source, child) - } - ProjectForUpdateCommand(newPlan, table.tableIdentifier) - } - } - private def pushDownUDFToJoinLeftRelation(plan: LogicalPlan): LogicalPlan = { - val output = plan match { - case proj@Project(cols, Join( - left, right, jointype: org.apache.spark.sql.catalyst.plans.JoinType, condition)) => - var projectionToBeAdded: Seq[org.apache.spark.sql.catalyst.expressions.Alias] = Seq.empty - val newCols = cols.map { col => - col match { - case a@Alias(s: ScalaUDF, name) - if (name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) || - name.equalsIgnoreCase( - CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) => - projectionToBeAdded :+= a - AttributeReference(name, StringType, true)().withExprId(a.exprId) - case other => other - } - } - val newLeft = left match { - case Project(columns, logicalPlan) => - Project(columns ++ projectionToBeAdded, logicalPlan) - case filter: Filter => - Project(filter.output ++ projectionToBeAdded, filter) - case other => other - } - Project(newCols, Join(newLeft, right, jointype, condition)) - case other => other - } - output - } - def isOptimized(plan: LogicalPlan): Boolean = { - plan find { - case cd: CarbonDictionaryCatalystDecoder => true - case ic: InsertIntoCarbonTable => true - case other => false - } isDefined - } - - case class ExtraNodeInfo(var hasCarbonRelation: Boolean) - - def fillNodeInfo( - plan: LogicalPlan, - extraNodeInfos: java.util.HashMap[LogicalPlan, ExtraNodeInfo]): ExtraNodeInfo = { - plan match { - case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] => - val extraNodeInfo = ExtraNodeInfo(true) - extraNodeInfo - case others => - val extraNodeInfo = ExtraNodeInfo(false) - others.children.foreach { childPlan => - val childExtraNodeInfo = fillNodeInfo(childPlan, extraNodeInfos) - if (childExtraNodeInfo.hasCarbonRelation) { - extraNodeInfo.hasCarbonRelation = true - } - } - // only put no carbon realtion plan - if (!extraNodeInfo.hasCarbonRelation) { - extraNodeInfos.put(plan, extraNodeInfo) - } - extraNodeInfo - } - } - - /** - * Steps for changing the plan. - * 1. It finds out the join condition columns and dimension aggregate columns which are need to - * be decoded just before that plan executes. - * 2. Plan starts transform by adding the decoder to the plan where it needs the decoded data - * like dimension aggregate columns decoder under aggregator and join condition decoder under - * join children. - */ - def transformCarbonPlan(plan: LogicalPlan, - relations: Seq[CarbonDecoderRelation]): LogicalPlan = { - if (plan.isInstanceOf[RunnableCommand]) { - return plan - } - var decoder = false - val mapOfNonCarbonPlanNodes = new java.util.HashMap[LogicalPlan, ExtraNodeInfo] - fillNodeInfo(plan, mapOfNonCarbonPlanNodes) - val aliasMap = CarbonAliasDecoderRelation() - // collect alias information before hand. - collectInformationOnAttributes(plan, aliasMap) - - def hasCarbonRelation(currentPlan: LogicalPlan): Boolean = { - val extraNodeInfo = mapOfNonCarbonPlanNodes.get(currentPlan) - if (extraNodeInfo == null) { - true - } else { - extraNodeInfo.hasCarbonRelation - } - } - - val attrMap = new util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation]() - relations.foreach(_.fillAttributeMap(attrMap)) - - def addTempDecoder(currentPlan: LogicalPlan): LogicalPlan = { - - def transformAggregateExpression(agg: Aggregate, - aggonGroups: util.HashSet[AttributeReferenceWrapper] = null): LogicalPlan = { - val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper] - if (aggonGroups != null) { - attrsOndimAggs.addAll(aggonGroups) - } - agg.aggregateExpressions.map { - case attr: AttributeReference => - case a@Alias(attr: AttributeReference, name) => - case aggExp: AggregateExpression => - aggExp.transform { - case aggExp: AggregateExpression => - collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap, attrMap) - aggExp - } - case others => - others.collect { - case attr: AttributeReference - if isDictionaryEncoded(attr, attrMap, aliasMap) => - attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - } - var child = agg.child - // Incase if the child also aggregate then push down decoder to child - if (attrsOndimAggs.size() > 0 && !child.equals(agg)) { - child = CarbonDictionaryTempDecoder(attrsOndimAggs, - new util.HashSet[AttributeReferenceWrapper](), - agg.child) - } - if (!decoder && aggonGroups == null) { - decoder = true - CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), - new util.HashSet[AttributeReferenceWrapper](), - Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child), - isOuter = true) - } else { - Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child) - } - } - - currentPlan match { - case limit@Limit(_, child: Sort) => - if (!decoder) { - decoder = true - CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), - new util.HashSet[AttributeReferenceWrapper](), - limit, - isOuter = true) - } else { - limit - } - case sort: Sort if !sort.child.isInstanceOf[CarbonDictionaryTempDecoder] => - val attrsOnSort = new util.HashSet[AttributeReferenceWrapper]() - sort.order.map { s => - s.collect { - case attr: AttributeReference - if isDictionaryEncoded(attr, attrMap, aliasMap) => - attrsOnSort.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - } - var child = sort.child - if (attrsOnSort.size() > 0 && !child.isInstanceOf[Sort]) { - child = CarbonDictionaryTempDecoder(attrsOnSort, - new util.HashSet[AttributeReferenceWrapper](), sort.child) - } - if (!decoder) { - decoder = true - CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), - new util.HashSet[AttributeReferenceWrapper](), - Sort(sort.order, sort.global, child), - isOuter = true) - } else { - Sort(sort.order, sort.global, child) - } - - case union: Union - if !(union.left.isInstanceOf[CarbonDictionaryTempDecoder] || - union.right.isInstanceOf[CarbonDictionaryTempDecoder]) => - val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper] - val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper] - val leftLocalAliasMap = CarbonAliasDecoderRelation() - val rightLocalAliasMap = CarbonAliasDecoderRelation() - // collect alias information for the child plan again. It is required as global alias - // may have duplicated in case of aliases - collectInformationOnAttributes(union.left, leftLocalAliasMap) - collectInformationOnAttributes(union.right, rightLocalAliasMap) - union.left.output.foreach { attr => - if (isDictionaryEncoded(attr, attrMap, leftLocalAliasMap)) { - leftCondAttrs.add(AttributeReferenceWrapper(leftLocalAliasMap.getOrElse(attr, attr))) - } - } - union.right.output.foreach { attr => - if (isDictionaryEncoded(attr, attrMap, rightLocalAliasMap)) { - rightCondAttrs.add( - AttributeReferenceWrapper(rightLocalAliasMap.getOrElse(attr, attr))) - } - } - var leftPlan = union.left - var rightPlan = union.right - if (hasCarbonRelation(leftPlan) && leftCondAttrs.size() > 0 && - !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) { - leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs, - new util.HashSet[AttributeReferenceWrapper](), - union.left, isOuter = false, Some(leftLocalAliasMap)) - } - if (hasCarbonRelation(rightPlan) && rightCondAttrs.size() > 0 && - !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) { - rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs, - new util.HashSet[AttributeReferenceWrapper](), - union.right, isOuter = false, Some(rightLocalAliasMap)) - } - if (!decoder) { - decoder = true - CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), - new util.HashSet[AttributeReferenceWrapper](), - Union(leftPlan, rightPlan), - isOuter = true) - } else { - Union(leftPlan, rightPlan) - } - - case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] => - transformAggregateExpression(agg) - case expand: Expand if !expand.child.isInstanceOf[CarbonDictionaryTempDecoder] => - val attrsOnExpand = new util.HashSet[AttributeReferenceWrapper] - expand.projections.map {s => - s.map { - case attr: AttributeReference => - case a@Alias(attr: AttributeReference, name) => - case others => - others.collect { - case attr: AttributeReference - if isDictionaryEncoded(attr, attrMap, aliasMap) => - attrsOnExpand.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - } - } - var child = expand.child - if (attrsOnExpand.size() > 0 && !child.isInstanceOf[Expand]) { - child = CarbonDictionaryTempDecoder(attrsOnExpand, - new util.HashSet[AttributeReferenceWrapper](), - expand.child) - } - if (!decoder) { - decoder = true - CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), - new util.HashSet[AttributeReferenceWrapper](), - CodeGenerateFactory.getInstance().expandFactory.createExpand(expand, child), - isOuter = true) - } else { - CodeGenerateFactory.getInstance().expandFactory.createExpand(expand, child) - } - case filter: Filter if !filter.child.isInstanceOf[CarbonDictionaryTempDecoder] => - val attrsOnConds = new util.HashSet[AttributeReferenceWrapper] - // In case the child is join then we cannot push down the filters so decode them earlier - if (filter.child.isInstanceOf[Join] || filter.child.isInstanceOf[Sort]) { - filter.condition.collect { - case attr: AttributeReference => - attrsOnConds.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - } else { - CarbonFilters - .selectFilters(splitConjunctivePredicates(filter.condition), attrsOnConds, aliasMap) - } - - var child = filter.child - if (attrsOnConds.size() > 0 && !child.isInstanceOf[Filter]) { - child = CarbonDictionaryTempDecoder(attrsOnConds, - new util.HashSet[AttributeReferenceWrapper](), - filter.child) - } - - if (!decoder) { - decoder = true - CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), - new util.HashSet[AttributeReferenceWrapper](), - Filter(filter.condition, child), - isOuter = true) - } else { - Filter(filter.condition, child) - } - - case j: Join - if !(j.left.isInstanceOf[CarbonDictionaryTempDecoder] || - j.right.isInstanceOf[CarbonDictionaryTempDecoder]) => - val attrsOnJoin = new util.HashSet[Attribute] - j.condition match { - case Some(expression) => - expression.collect { - case attr: AttributeReference - if isDictionaryEncoded(attr, attrMap, aliasMap) => - attrsOnJoin.add(aliasMap.getOrElse(attr, attr)) - } - case _ => - } - - val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper] - val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper] - if (attrsOnJoin.size() > 0) { - - attrsOnJoin.asScala.map { attr => - if (qualifierPresence(j.left, attr)) { - leftCondAttrs.add(AttributeReferenceWrapper(attr)) - } - if (qualifierPresence(j.right, attr)) { - rightCondAttrs.add(AttributeReferenceWrapper(attr)) - } - } - var leftPlan = j.left - var rightPlan = j.right - if (leftCondAttrs.size() > 0 && - !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) { - leftPlan = leftPlan match { - case agg: Aggregate => - CarbonDictionaryTempDecoder(leftCondAttrs, - new util.HashSet[AttributeReferenceWrapper](), - transformAggregateExpression(agg, leftCondAttrs)) - case _ => - CarbonDictionaryTempDecoder(leftCondAttrs, - new util.HashSet[AttributeReferenceWrapper](), - j.left) - } - } - if (rightCondAttrs.size() > 0 && - !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) { - rightPlan = rightPlan match { - case agg: Aggregate => - CarbonDictionaryTempDecoder(rightCondAttrs, - new util.HashSet[AttributeReferenceWrapper](), - transformAggregateExpression(agg, rightCondAttrs)) - case _ => - CarbonDictionaryTempDecoder(rightCondAttrs, - new util.HashSet[AttributeReferenceWrapper](), - j.right) - } - } - if (!decoder) { - decoder = true - CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), - new util.HashSet[AttributeReferenceWrapper](), - Join(leftPlan, rightPlan, j.joinType, j.condition), - isOuter = true) - } else { - Join(leftPlan, rightPlan, j.joinType, j.condition) - } - } else { - j - } - - case p: Project - if relations.nonEmpty && !p.child.isInstanceOf[CarbonDictionaryTempDecoder] => - val attrsOnProjects = new util.HashSet[AttributeReferenceWrapper] - p.projectList.map { - case attr: AttributeReference => - case a@Alias(attr: AttributeReference, name) => - case others => - others.collect { - case attr: AttributeReference - if isDictionaryEncoded(attr, attrMap, aliasMap) => - attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - } - var child = p.child - if (attrsOnProjects.size() > 0 && !child.isInstanceOf[Project]) { - child = CarbonDictionaryTempDecoder(attrsOnProjects, - new util.HashSet[AttributeReferenceWrapper](), - p.child) - } - if (!decoder) { - decoder = true - CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), - new util.HashSet[AttributeReferenceWrapper](), - Project(p.projectList, child), - isOuter = true) - } else { - Project(p.projectList, child) - } - - case wd: Window if !wd.child.isInstanceOf[CarbonDictionaryTempDecoder] => - val attrsOnProjects = new util.HashSet[AttributeReferenceWrapper] - wd.projectList.map { - case attr: AttributeReference => - case others => - others.collect { - case attr: AttributeReference - if isDictionaryEncoded(attr, attrMap, aliasMap) => - attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - } - wd.windowExpressions.map { others => - others.collect { - case attr: AttributeReference - if isDictionaryEncoded(attr, attrMap, aliasMap) => - attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - } - wd.partitionSpec.map{ - case attr: AttributeReference => - case others => - others.collect { - case attr: AttributeReference - if isDictionaryEncoded(attr, attrMap, aliasMap) => - attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - } - wd.orderSpec.map { s => - s.collect { - case attr: AttributeReference - if isDictionaryEncoded(attr, attrMap, aliasMap) => - attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - } - wd.partitionSpec.map { s => - s.collect { - case attr: AttributeReference - if isDictionaryEncoded(attr, attrMap, aliasMap) => - attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - } - var child = wd.child - if (attrsOnProjects.size() > 0 && !child.isInstanceOf[Project]) { - child = CarbonDictionaryTempDecoder(attrsOnProjects, - new util.HashSet[AttributeReferenceWrapper](), - wd.child) - } - if (!decoder) { - decoder = true - CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), - new util.HashSet[AttributeReferenceWrapper](), - Window(wd.projectList, wd.windowExpressions, wd.partitionSpec, wd.orderSpec, child), - isOuter = true) - } else { - Window(wd.projectList, wd.windowExpressions, wd.partitionSpec, wd.orderSpec, child) - } - - case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] => - if (!decoder) { - decoder = true - CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), - new util.HashSet[AttributeReferenceWrapper](), l, isOuter = true) - } else { - l - } - - case others => others - } - - } - - val transFormedPlan = - plan transformDown { - case cd: CarbonDictionaryTempDecoder if cd.isOuter => - decoder = true - cd - case currentPlan => - if (hasCarbonRelation(currentPlan)) { - addTempDecoder(currentPlan) - } else { - currentPlan - } - } - - val processor = new CarbonDecoderProcessor - processor.updateDecoders(processor.getDecoderList(transFormedPlan)) - updateProjection(updateTempDecoder(transFormedPlan, aliasMap, attrMap)) - } - - private def updateTempDecoder(plan: LogicalPlan, - aliasMapOriginal: CarbonAliasDecoderRelation, - attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation]): - LogicalPlan = { - var allAttrsNotDecode: util.Set[AttributeReferenceWrapper] = - new util.HashSet[AttributeReferenceWrapper]() - val marker = new CarbonPlanMarker - var aliasMap = aliasMapOriginal - plan transformDown { - case cd: CarbonDictionaryTempDecoder if !cd.processed => - cd.processed = true - allAttrsNotDecode = cd.attrsNotDecode - aliasMap = cd.aliasMap.getOrElse(aliasMap) - marker.pushMarker(allAttrsNotDecode) - if (cd.isOuter) { - CarbonDictionaryCatalystDecoder(relations, - ExcludeProfile(cd.getAttrsNotDecode.asScala.toSeq), - aliasMap, - isOuter = true, - cd.child) - } else { - CarbonDictionaryCatalystDecoder(relations, - IncludeProfile(cd.getAttrList.asScala.toSeq), - aliasMap, - isOuter = false, - cd.child) - } - case cd: CarbonDictionaryCatalystDecoder => - cd - case sort: Sort => - val sortExprs = sort.order.map { s => - s.transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - }.asInstanceOf[SortOrder] - } - Sort(sortExprs, sort.global, sort.child) - case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] => - val aggExps = agg.aggregateExpressions.map { aggExp => - aggExp.transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } - }.asInstanceOf[Seq[NamedExpression]] - - val grpExps = agg.groupingExpressions.map { gexp => - gexp.transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } - } - Aggregate(grpExps, aggExps, agg.child) - case expand: Expand => - expand.transformExpressions { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } - case filter: Filter => - val filterExps = filter.condition transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } - Filter(filterExps, filter.child) - case j: Join => - marker.pushBinaryMarker(allAttrsNotDecode) - j - case u: Union => - marker.pushBinaryMarker(allAttrsNotDecode) - u - case p: Project if relations.nonEmpty => - val prExps = p.projectList.map { prExp => - prExp.transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } - }.asInstanceOf[Seq[NamedExpression]] - Project(prExps, p.child) - case wd: Window if relations.nonEmpty => - val prExps = wd.projectList.map { prExp => - prExp.transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } - }.asInstanceOf[Seq[Attribute]] - val wdExps = wd.windowExpressions.map { gexp => - gexp.transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } - }.asInstanceOf[Seq[NamedExpression]] - val partitionSpec = wd.partitionSpec.map{ exp => - exp.transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } - } - val orderSpec = wd.orderSpec.map { exp => - exp.transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } - }.asInstanceOf[Seq[SortOrder]] - Window(prExps, wdExps, partitionSpec, orderSpec, wd.child) - - case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] => - allAttrsNotDecode = marker.revokeJoin() - l - case others => others - } - } - - private def updateProjection(plan: LogicalPlan): LogicalPlan = { - val transFormedPlan = plan transform { - case p@Project(projectList: Seq[NamedExpression], cd: CarbonDictionaryCatalystDecoder) => - if (cd.child.isInstanceOf[Filter] || cd.child.isInstanceOf[LogicalRelation]) { - Project(projectList: Seq[NamedExpression], cd.child) - } else { - p - } - case f@Filter(condition: Expression, cd: CarbonDictionaryCatalystDecoder) => - if (cd.child.isInstanceOf[Project] || cd.child.isInstanceOf[LogicalRelation]) { - Filter(condition, cd.child) - } else { - f - } - } - // Remove unnecessary decoders - val finalPlan = transFormedPlan transform { - case CarbonDictionaryCatalystDecoder(_, profile, _, false, child) - if profile.isInstanceOf[IncludeProfile] && profile.isEmpty => - child - } - val updateDtrFn = finalPlan transform { - case p@Project(projectList: Seq[NamedExpression], cd) => - if (cd.isInstanceOf[Filter] || cd.isInstanceOf[LogicalRelation]) { - p.transformAllExpressions { - case a@Alias(exp, _) - if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => - Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifiers, - a.explicitMetadata) - case exp: NamedExpression - if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => - CustomDeterministicExpression(exp) - } - } else { - p - } - case f@Filter(condition: Expression, cd) => - if (cd.isInstanceOf[Project] || cd.isInstanceOf[LogicalRelation]) { - f.transformAllExpressions { - case a@Alias(exp, _) - if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => - Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifiers, - a.explicitMetadata) - case exp: NamedExpression - if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => - CustomDeterministicExpression(exp) - } - } else { - f - } - } - - updateDtrFn - } - - private def collectInformationOnAttributes(plan: LogicalPlan, - aliasMap: CarbonAliasDecoderRelation) { - plan transformAllExpressions { - case a@Alias(exp, name) => - exp match { - case attr: Attribute => aliasMap.put(a.toAttribute, attr) - case _ => aliasMap.put(a.toAttribute, AttributeReference("", StringType)()) - } - a - } - // collect the output of expand and add projections attributes as alias to it. - plan.collect { - case expand: Expand => - expand.projections.foreach {s => - s.zipWithIndex.foreach { f => - f._1 match { - case attr: AttributeReference => - aliasMap.put(expand.output(f._2).toAttribute, attr) - case a@Alias(attr: AttributeReference, name) => - aliasMap.put(expand.output(f._2).toAttribute, attr) - case others => - } - } - } - } - } - - // Collect aggregates on dimensions so that we can add decoder to it. - private def collectDimensionAggregates(aggExp: AggregateExpression, - attrsOndimAggs: util.HashSet[AttributeReferenceWrapper], - aliasMap: CarbonAliasDecoderRelation, - attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation]) { - aggExp collect { - case attr: AttributeReference if isDictionaryEncoded(attr, attrMap, aliasMap) => - attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - } - - /** - * Update the attribute datatype with [IntegerType] if the carbon column is encoded with - * dictionary. - * - */ - private def updateDataType(attr: Attribute, - attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation], - allAttrsNotDecode: java.util.Set[AttributeReferenceWrapper], - aliasMap: CarbonAliasDecoderRelation): Attribute = { - val uAttr = aliasMap.getOrElse(attr, attr) - val relation = Option(attrMap.get(AttributeReferenceWrapper(uAttr))) - if (relation.isDefined) { - relation.get.dictionaryMap.get(uAttr.name) match { - case Some(true) - if !allAttrsNotDecode.contains(AttributeReferenceWrapper(uAttr)) => - val newAttr = AttributeReference(attr.name, - IntegerType, - attr.nullable, - attr.metadata)(attr.exprId, attr.qualifiers) - newAttr - case _ => attr - } - } else { - attr - } - } - - private def isDictionaryEncoded(attr: Attribute, - attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation], - aliasMap: CarbonAliasDecoderRelation): Boolean = { - val uAttr = aliasMap.getOrElse(attr, attr) - val relation = Option(attrMap.get(AttributeReferenceWrapper(uAttr))) - if (relation.isDefined) { - relation.get.dictionaryMap.get(uAttr.name) match { - case Some(true) => true - case _ => false - } - } else { - false - } - } - - def qualifierPresence(plan: LogicalPlan, attr: Attribute): Boolean = { - var present = false - plan collect { - case l: LogicalRelation if l.attributeMap.contains(attr) => - present = true - } - present - } -} - -case class CarbonDecoderRelation( - attributeMap: AttributeMap[AttributeReference], - carbonRelation: CarbonDatasourceRelation) { - - val extraAttrs = new ArrayBuffer[Attribute]() - - def addAttribute(attr: Attribute): Unit = { - extraAttrs += attr - } - - def contains(attr: Attribute): Boolean = { - val exists = - attributeMap.exists(entry => entry._1.name.equalsIgnoreCase(attr.name) && - entry._1.exprId.equals(attr.exprId)) || - extraAttrs.exists(entry => entry.name.equalsIgnoreCase(attr.name) && - entry.exprId.equals(attr.exprId)) - exists - } - - def fillAttributeMap(attrMap: java.util.HashMap[AttributeReferenceWrapper, - CarbonDecoderRelation]): Unit = { - attributeMap.foreach { attr => - attrMap.put(AttributeReferenceWrapper(attr._1), this) - } - } - - lazy val dictionaryMap = carbonRelation.carbonRelation.metaData.dictionaryMap -} - -
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala b/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala deleted file mode 100644 index bb00126..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/test/SparkTestQueryExecutor.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.test - -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.{CarbonContext, DataFrame, SQLContext} - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties - -/** - * This class is a sql executor of unit test case for spark version 1.x. - */ - -class SparkTestQueryExecutor extends TestQueryExecutorRegister { - override def sql(sqlText: String): DataFrame = SparkTestQueryExecutor.cc.sql(sqlText) - - override def sqlContext: SQLContext = SparkTestQueryExecutor.cc - - override def stop(): Unit = SparkTestQueryExecutor.cc.sparkContext.stop() -} - -object SparkTestQueryExecutor { - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - LOGGER.info("use TestQueryExecutorImplV1") - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH, - System.getProperty("java.io.tmpdir")) - .addProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL) - .addProperty(CarbonCommonConstants.STORE_LOCATION, TestQueryExecutor.storeLocation) - - val sc = new SparkContext(new SparkConf() - .setAppName("CarbonSpark") - .setMaster("local[2]") - .set("spark.sql.shuffle.partitions", "20")) - sc.setLogLevel("ERROR") - - val cc = new CarbonContext(sc, TestQueryExecutor.storeLocation, TestQueryExecutor.metastoredb) -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala deleted file mode 100644 index e73f78c..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.util - -import org.apache.spark.TaskContext - - -object TaskContextUtil { - def setTaskContext(context: TaskContext): Unit = { - val localThreadContext = TaskContext.get() - if (localThreadContext == null) { - TaskContext.setTaskContext(context) - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister ---------------------------------------------------------------------- diff --git a/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister deleted file mode 100644 index d09c9b5..0000000 --- a/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ /dev/null @@ -1,17 +0,0 @@ -## ------------------------------------------------------------------------ -## Licensed to the Apache Software Foundation (ASF) under one or more -## contributor license agreements. See the NOTICE file distributed with -## this work for additional information regarding copyright ownership. -## The ASF licenses this file to You under the Apache License, Version 2.0 -## (the "License"); you may not use this file except in compliance with -## the License. You may obtain a copy of the License at -## -## http://www.apache.org/licenses/LICENSE-2.0 -## -## Unless required by applicable law or agreed to in writing, software -## distributed under the License is distributed on an "AS IS" BASIS, -## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -## See the License for the specific language governing permissions and -## limitations under the License. -## ------------------------------------------------------------------------ -org.apache.spark.sql.CarbonSource \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.test.TestQueryExecutorRegister ---------------------------------------------------------------------- diff --git a/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.test.TestQueryExecutorRegister b/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.test.TestQueryExecutorRegister deleted file mode 100644 index fc96db4..0000000 --- a/integration/spark/src/resources/META-INF/services/org.apache.spark.sql.test.TestQueryExecutorRegister +++ /dev/null @@ -1,17 +0,0 @@ -## ------------------------------------------------------------------------ -## Licensed to the Apache Software Foundation (ASF) under one or more -## contributor license agreements. See the NOTICE file distributed with -## this work for additional information regarding copyright ownership. -## The ASF licenses this file to You under the Apache License, Version 2.0 -## (the "License"); you may not use this file except in compliance with -## the License. You may obtain a copy of the License at -## -## http://www.apache.org/licenses/LICENSE-2.0 -## -## Unless required by applicable law or agreed to in writing, software -## distributed under the License is distributed on an "AS IS" BASIS, -## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -## See the License for the specific language governing permissions and -## limitations under the License. -## ------------------------------------------------------------------------ -org.apache.spark.sql.test.SparkTestQueryExecutor \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/resources/badrecords/test2.csv ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/resources/badrecords/test2.csv b/integration/spark/src/test/resources/badrecords/test2.csv deleted file mode 100644 index 51d25b2..0000000 --- a/integration/spark/src/test/resources/badrecords/test2.csv +++ /dev/null @@ -1,4 +0,0 @@ -0,569,silo -1,843658743265874365874365874365584376547598375987,hello -2,87436587349436587568784658743065874376,priyal -3,56985,simple \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala deleted file mode 100644 index aaaf66b..0000000 --- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.integration.spark.testsuite.complexType - -import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties - -/** - * Test class of creating and loading for carbon table with double - * - */ -class TestComplexPrimitiveTimestampDirectDictionary extends QueryTest with BeforeAndAfterAll { - - override def beforeAll: Unit = { - sql("drop table if exists complexcarbontimestamptable") - sql("drop table if exists complexhivetimestamptable") - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSS") - sql("CREATE TABLE complexcarbontimestamptable (empno string,workdate Timestamp,punchinout array<Timestamp>, worktime struct<begintime:Timestamp, endtime:Timestamp>, salary double) STORED BY 'org.apache.carbondata.format'") - sql(s"LOAD DATA local inpath '$resourcesPath/datasamplecomplex.csv' INTO TABLE complexcarbontimestamptable OPTIONS" + - "('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='empno,workdate,punchinout,worktime,salary')"); - sql("CREATE TABLE complexhivetimestamptable (empno string,workdate Timestamp,punchinout array<Timestamp>, worktime struct<begintime:Timestamp, endtime:Timestamp>, salary double)row format delimited fields terminated by ',' collection items terminated by '$'") - sql(s"LOAD DATA local inpath '$resourcesPath/datasamplecomplex.csv' INTO TABLE complexhivetimestamptable") - } - - test("select * query") { - checkAnswer(sql("select * from complexcarbontimestamptable"), - sql("select * from complexhivetimestamptable")) - } - - test("timestamp complex type in the middle of complex types") { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSS") - sql("CREATE TABLE testtimestampcarbon(imei string,rat array<string>, sid array<int>, end_time array<Timestamp>, probeid array<double>, contact struct<name:string, id:string>)STORED BY 'org.apache.carbondata.format'") - sql("LOAD DATA local inpath '" + resourcesPath + "/timestampdata.csv' INTO TABLE testtimestampcarbon options('DELIMITER'=',', 'QUOTECHAR'='\"','COMPLEX_DELIMITER_LEVEL_1'='$', 'FILEHEADER'='imei,rat,sid,end_time,probeid,contact')") - sql("CREATE TABLE testtimestamphive(imei string,rat array<string>, sid array<int>, end_time array<Timestamp>, probeid array<double>, contact struct<name:string, id:string>)row format delimited fields terminated by ',' collection items terminated by '$'") - sql(s"LOAD DATA local inpath '$resourcesPath/timestampdata.csv' INTO TABLE testtimestamphive") - checkAnswer(sql("select * from testtimestampcarbon"), sql("select * from testtimestamphive")) - sql("drop table if exists testtimestampcarbon") - sql("drop table if exists testtimestamphive") - } - - override def afterAll { - sql("drop table if exists complexcarbontimestamptable") - sql("drop table if exists complexhivetimestamptable") - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala deleted file mode 100644 index 98e4f18..0000000 --- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.integration.spark.testsuite.dataload - - -import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode} -import org.scalatest.BeforeAndAfterAll - -class SparkDatasourceSuite extends QueryTest with BeforeAndAfterAll { - - var df: DataFrame = _ - - override def beforeAll { - sql("DROP TABLE IF EXISTS carbon1") - - import sqlContext.implicits._ - df = sqlContext.sparkContext.parallelize(1 to 1000) - .map(x => ("a", "b", x)) - .toDF("c1", "c2", "c3") - - // save dataframe to carbon file - df.write - .format("carbondata") - .option("tableName", "carbon1") - .mode(SaveMode.Overwrite) - .save() - } - - test("read and write using CarbonContext") { - val in = sqlContext.read - .format("carbondata") - .option("tableName", "carbon1") - .load() - - assert(in.where("c3 > 500").count() == 500) - } - - test("read and write using CarbonContext with compression") { - val in = sqlContext.read - .format("carbondata") - .option("tableName", "carbon1") - .option("compress", "true") - .load() - - assert(in.where("c3 > 500").count() == 500) - } - - test("test overwrite") { - sql("DROP TABLE IF EXISTS carbon4") - df.write - .format("carbondata") - .option("tableName", "carbon4") - .mode(SaveMode.Overwrite) - .save() - df.write - .format("carbondata") - .option("tableName", "carbon4") - .mode(SaveMode.Overwrite) - .save() - val in = sqlContext.read - .format("carbondata") - .option("tableName", "carbon4") - .load() - assert(in.where("c3 > 500").count() == 500) - sql("DROP TABLE IF EXISTS carbon4") - } - - test("read and write using CarbonContext, multiple load") { - sql("DROP TABLE IF EXISTS carbon4") - df.write - .format("carbondata") - .option("tableName", "carbon4") - .mode(SaveMode.Overwrite) - .save() - df.write - .format("carbondata") - .option("tableName", "carbon4") - .mode(SaveMode.Append) - .save() - val in = sqlContext.read - .format("carbondata") - .option("tableName", "carbon4") - .load() - assert(in.where("c3 > 500").count() == 1000) - sql("DROP TABLE IF EXISTS carbon4") - } - - test("query using SQLContext") { - val newSQLContext = new SQLContext(sqlContext.sparkContext) - newSQLContext.sql( - s""" - | CREATE TEMPORARY TABLE temp - | (c1 string, c2 string, c3 int) - | USING org.apache.spark.sql.CarbonSource - | OPTIONS (path '$storeLocation/default/carbon1') - """.stripMargin) - checkAnswer(newSQLContext.sql( - """ - | SELECT c1, c2, count(*) - | FROM temp - | WHERE c3 > 100 - | GROUP BY c1, c2 - """.stripMargin), Seq(Row("a", "b", 900))) - newSQLContext.dropTempTable("temp") - } - - test("query using SQLContext without providing schema") { - val newSQLContext = new SQLContext(sqlContext.sparkContext) - newSQLContext.sql( - s""" - | CREATE TEMPORARY TABLE temp - | USING org.apache.spark.sql.CarbonSource - | OPTIONS (path '$storeLocation/default/carbon1') - """.stripMargin) - checkAnswer(newSQLContext.sql( - """ - | SELECT c1, c2, count(*) - | FROM temp - | WHERE c3 > 100 - | GROUP BY c1, c2 - """.stripMargin), Seq(Row("a", "b", 900))) - newSQLContext.dropTempTable("temp") - } - - test("query using SQLContext, multiple load") { - sql("DROP TABLE IF EXISTS test") - sql( - """ - | CREATE TABLE test(id int, name string, city string, age int) - | STORED BY 'org.apache.carbondata.format' - """.stripMargin) - val testData = s"${resourcesPath}/sample.csv" - sql(s"LOAD DATA LOCAL INPATH '$testData' into table test") - sql(s"LOAD DATA LOCAL INPATH '$testData' into table test") - - val newSQLContext = new SQLContext(sqlContext.sparkContext) - newSQLContext.sql( - s""" - | CREATE TEMPORARY TABLE temp - | (id int, name string, city string, age int) - | USING org.apache.spark.sql.CarbonSource - | OPTIONS (path '$storeLocation/default/test') - """.stripMargin) - checkAnswer(newSQLContext.sql( - """ - | SELECT count(id) - | FROM temp - """.stripMargin), Seq(Row(8))) - newSQLContext.dropTempTable("temp") - sql("DROP TABLE test") - } - - test("json data with long datatype issue CARBONDATA-405") { - val jsonDF = sqlContext.read.format("json").load(s"$resourcesPath/test.json") - jsonDF.write - .format("carbondata") - .option("tableName", "dftesttable") - .option("compress", "true") - .mode(SaveMode.Overwrite) - .save() - val carbonDF = sqlContext - .read - .format("carbondata") - .option("tableName", "dftesttable") - .load() - checkAnswer( - carbonDF.select("age", "name"), - jsonDF.select("age", "name")) - sql("drop table dftesttable") - } - - override def afterAll { - sql("DROP TABLE IF EXISTS carbon1") - sql("DROP TABLE IF EXISTS carbon2") - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala deleted file mode 100644 index b61ecce..0000000 --- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSingleQuotechar.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.integration.spark.testsuite.dataload - -import org.apache.spark.sql.Row -import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll -/** - * Test Class for data loading when there is single quote in fact data - * - */ -class TestLoadDataWithSingleQuotechar extends QueryTest with BeforeAndAfterAll { - override def beforeAll { - sql("DROP TABLE IF EXISTS carbontable") - sql( - "CREATE TABLE carbontable (id Int, name String) STORED BY 'carbondata'") - } - - test("test data loading with single quote char") { - try { - sql( - s"LOAD DATA LOCAL INPATH '$resourcesPath/dataWithSingleQuote.csv' INTO TABLE " + - "carbontable OPTIONS('DELIMITER'= ',')") - sql("SELECT * from carbontable") - checkAnswer( - sql("SELECT * from carbontable"), - Seq(Row(1,"Tom"), - Row(2,"Tony\n3,Lily"), - Row(4,"Games\""), - Row(5,"prival\"\n6,\"hello\"") - ) - ) - } catch { - case e: Throwable => - assert(false) - } - } - - override def afterAll { - sql("DROP TABLE carbontable") - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllQueriesSpark1TestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllQueriesSpark1TestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllQueriesSpark1TestCase.scala deleted file mode 100644 index 1c003c7..0000000 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllQueriesSpark1TestCase.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.testsuite.allqueries - -import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{Row, SaveMode} -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties - -/** - * Test Class for all query on multiple datatypes - * - */ -class AllQueriesSpark1TestCase extends QueryTest with BeforeAndAfterAll { - - override def beforeAll { - clean - - sql("create table if not exists Carbon_automation_test (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active _phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string, gamePointId int,contractNumber int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_INCLUDE'='Latest_MONTH,Latest_DAY,deviceInformationId')") - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) - sql("LOAD DATA LOCAL INPATH '" + resourcesPath + "/100_olap.csv' INTO table Carbon_automation_test options('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'= 'imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Lat est_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')") - } - - def clean { - sql("drop table if exists Carbon_automation_test") - } - - override def afterAll { - clean - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") - } - - - //TC_113 - test("select percentile_approx(deviceInformationId,0.2) as a from Carbon_automation_test")({ - checkAnswer( - sql("select percentile_approx(deviceInformationId,0.2) as a from Carbon_automation_test"), - Seq(Row(100005.8))) - }) - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableSpark1TestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableSpark1TestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableSpark1TestCase.scala deleted file mode 100644 index d762ec6..0000000 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableSpark1TestCase.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark.testsuite.allqueries - -import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties - -class InsertIntoCarbonTableSpark1TestCase extends QueryTest with BeforeAndAfterAll { - override def beforeAll { - sql("drop table if exists THive") - sql("create table THive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions st ring, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','") - sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE THive") - } - - - test("insert from carbon-select columns-source table has more column then target column") { - val timeStampPropOrig = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT) - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) - - sql("drop table if exists load") - sql("drop table if exists inser") - sql("CREATE TABLE load(imei string,age int,task bigint,num double,level decimal(10,3),productdate timestamp,name string,point int)STORED BY 'org.apache.carbondata.format'") - sql("LOAD DATA INPATH '" + resourcesPath + "/shortolap.csv' INTO TABLE load options ('DELIMITER'=',', 'QUOTECHAR'='\"','FILEHEADER' = 'imei,age,task,num,level,productdate,name,point')") - sql("CREATE TABLE inser(imei string,age int,task bigint,num double,level decimal(10,3),productdate timestamp)STORED BY 'org.apache.carbondata.format'") - sql("insert into inser select * from load") - checkAnswer( - sql("select * from inser"), - sql("select imei,age,task,num,level,productdate from load") - ) - sql("drop table if exists load") - sql("drop table if exists inser") - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timeStampPropOrig) - } - - test("insert->hive column more than carbon column->success") { - sql("drop table if exists TCarbon") - sql("create table TCarbon (imei string,deviceInformationId int,MAC string,deviceColor string,gamePointId double,contractNumber BigInt) STORED BY 'org.apache.carbondata.format'") - - sql("insert into TCarbon select imei,deviceInformationId,MAC,deviceColor,gamePointId,contractNumber,device_backColor,modelId,CUPAudit,CPIClocked from THive") - checkAnswer( - sql("select imei,deviceInformationId,MAC,deviceColor,gamePointId,contractNumber from THive"), - sql("select imei,deviceInformationId,MAC,deviceColor,gamePointId,contractNumber from TCarbon") - ) - } - -// test("insert->insert empty data -pass") { -// sql("drop table if exists TCarbon") -// sql("create table TCarbon (imei string,deviceInformationId int,MAC string) STORED BY 'org.apache.carbondata.format'") -// sql("insert into TCarbon select imei,deviceInformationId,MAC from THive where MAC='wrongdata'") -// val result = sql("select imei,deviceInformationId,MAC from TCarbon where MAC='wrongdata'").collect() -// checkAnswer( -// sql("select imei,deviceInformationId,MAC from THive where MAC='wrongdata'"), -// sql("select imei,deviceInformationId,MAC from TCarbon where MAC='wrongdata'") -// ) -// } - - override def afterAll { - sql("drop table if exists load") - sql("drop table if exists inser") - sql("DROP TABLE IF EXISTS THive") - sql("DROP TABLE IF EXISTS TCarbon") - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala deleted file mode 100644 index 7aee00d..0000000 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.testsuite.badrecordloger - -import java.io.File - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.hive.HiveContext -import org.scalatest.BeforeAndAfterAll - - -/** - * Test Class for detailed query on timestamp datatypes - * - * - */ -class BadRecordLoggerSharedDictionaryTest extends QueryTest with BeforeAndAfterAll { - var hiveContext: HiveContext = _ - var csvFilePath : String = null - var timestamp_format: String = null - - override def beforeAll { - sql("drop table IF EXISTS testdrive") - sql( - """create table testdrive (ID int,CUST_ID int,cust_name string) - STORED BY 'org.apache.carbondata.format' - TBLPROPERTIES("columnproperties.cust_name.shared_column"="shared.cust_name", - "columnproperties.ID.shared_column"="shared.ID", - "columnproperties.CUST_ID.shared_column"="shared.CUST_ID", - 'DICTIONARY_INCLUDE'='ID,CUST_ID')""" - ) - - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - new File("./target/test/badRecords") - .getCanonicalPath - ) - - val carbonProp = CarbonProperties.getInstance() - timestamp_format = carbonProp.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, - CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) - carbonProp.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/mm/dd") - val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../") - .getCanonicalPath - csvFilePath = currentDirectory + "/src/test/resources/badrecords/test2.csv" - - } - test("dataload with bad record test") { - try { - sql( - s"""LOAD DATA INPATH '$csvFilePath' INTO TABLE testdrive OPTIONS('DELIMITER'=',', - |'QUOTECHAR'= '"', 'BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='FAIL', - |'FILEHEADER'= 'ID,CUST_ID,cust_name')""".stripMargin) - } catch { - case e: Throwable => - assert(e.getMessage.contains("Data load failed due to bad record")) - } - } - - override def afterAll { - sql("drop table IF EXISTS testdrive") - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timestamp_format) - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createtable/TestCreateTableSyntax.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createtable/TestCreateTableSyntax.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createtable/TestCreateTableSyntax.scala deleted file mode 100644 index 7400839..0000000 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createtable/TestCreateTableSyntax.scala +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.testsuite.createtable - -import org.apache.spark.sql.Row -import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException - -/** - * Test Class for validating create table syntax for carbontable - * - */ -class TestCreateTableSyntax extends QueryTest with BeforeAndAfterAll { - - override def beforeAll { - } - - test("Struct field with underscore and struct<struct> syntax check") { - sql("drop table if exists carbontable") - sql("create table carbontable(id int, username struct<sur_name:string," + - "actual_name:struct<first_name:string,last_name:string>>, country string, salary double)" + - "STORED BY 'org.apache.carbondata.format'") - sql("describe carbontable") - } - - test("Test table rename operation on carbon table and on hive table") { - sql("drop table if exists hivetable") - sql("drop table if exists carbontable") - sql("drop table if exists hiveRenamedTable") - sql("drop table if exists carbonRenamedTable") - sql("create table hivetable(test1 int, test2 array<String>,test3 array<bigint>,"+ - "test4 array<int>,test5 array<decimal>,test6 array<timestamp>,test7 array<double>)"+ - "row format delimited fields terminated by ',' collection items terminated by '$' map keys terminated by ':'") - sql("alter table hivetable rename To hiveRenamedTable") - sql("create table carbontable(test1 int, test2 array<String>,test3 array<bigint>,"+ - "test4 array<int>,test5 array<decimal>,test6 array<timestamp>,test7 array<double>)"+ - "STORED BY 'org.apache.carbondata.format'") - sql("alter table carbontable compact 'minor'") - try { - sql("alter table carbontable rename To carbonRenamedTable") - assert(false) - } catch { - case e : MalformedCarbonCommandException => { - assert(e.getMessage.equals("Unsupported alter operation on carbon table")) - } - } - } - - - test("test carbon table create with complex datatype as dictionary exclude") { - try { - sql("drop table if exists carbontable") - sql("create table carbontable(id int, name string, dept string, mobile array<string>, "+ - "country string, salary double) STORED BY 'org.apache.carbondata.format' " + - "TBLPROPERTIES('DICTIONARY_EXCLUDE'='dept,mobile')") - assert(false) - } catch { - case e : MalformedCarbonCommandException => { - assert(e.getMessage.equals("DICTIONARY_EXCLUDE is unsupported for complex datatype column: mobile")) - } - } - } - - test("test carbon table create with double datatype as dictionary exclude") { - try { - sql("drop table if exists carbontable") - sql("create table carbontable(id int, name string, dept string, mobile array<string>, "+ - "country string, salary double) STORED BY 'org.apache.carbondata.format' " + - "TBLPROPERTIES('DICTIONARY_EXCLUDE'='salary')") - assert(false) - } catch { - case e : MalformedCarbonCommandException => { - assert(e.getMessage.equals("DICTIONARY_EXCLUDE is unsupported for double " + - "data type column: salary")) - } - } - } - test("test carbon table create with int datatype as dictionary exclude") { - sql("drop table if exists carbontable") - sql("create table carbontable(id int, name string, dept string, mobile array<string>, " + - "country string, salary double) STORED BY 'org.apache.carbondata.format' " + - "TBLPROPERTIES('DICTIONARY_EXCLUDE'='id')") - assert(true) - } - - test("test carbon table create with decimal datatype as dictionary exclude") { - try { - sql("drop table if exists carbontable") - sql("create table carbontable(id int, name string, dept string, mobile array<string>, "+ - "country string, salary decimal) STORED BY 'org.apache.carbondata.format' " + - "TBLPROPERTIES('DICTIONARY_EXCLUDE'='salary')") - assert(false) - } catch { - case e : MalformedCarbonCommandException => { - assert(e.getMessage.equals("DICTIONARY_EXCLUDE is unsupported for decimal " + - "data type column: salary")) - } - } - } - - test("describe formatted on hive table and carbon table") { - sql("drop table if exists hivetable") - sql("drop table if exists carbontable") - sql("create table carbontable(id int, username struct<sur_name:string," + - "actual_name:struct<first_name:string,last_name:string>>, country string, salary double)" + - "STORED BY 'org.apache.carbondata.format'") - sql("describe formatted carbontable") - sql("create table hivetable(id int, username struct<sur_name:string," + - "actual_name:struct<first_name:string,last_name:string>>, country string, salary double)") - sql("describe formatted hivetable") - } - - test("describe command carbon table for decimal scale and precision test") { - sql("drop table if exists carbontablePrecision") - sql("create table carbontablePrecision(id int, name string, dept string, mobile array<string>, "+ - "country string, salary decimal(10,6)) STORED BY 'org.apache.carbondata.format' " + - "TBLPROPERTIES('DICTIONARY_INCLUDE'='salary,id')") - checkAnswer( - sql("describe carbontablePrecision"), - Seq(Row("country","string",""), - Row("dept","string",""),Row("id","int",""),Row("mobile","array<string>",""),Row("name","string",""), - Row("salary","decimal(10,6)","") - ) - ) - } - - test("create carbon table without dimensions") { - try { - sql("drop table if exists carbontable") - sql("create table carbontable(msr1 int, msr2 double, msr3 bigint, msr4 decimal)" + - " stored by 'org.apache.carbondata.format'") - assert(true) - } catch { - case e : MalformedCarbonCommandException => { - assert(e.getMessage.equals("Table default.carbontable can not be created without " + - "key columns. Please use DICTIONARY_INCLUDE or DICTIONARY_EXCLUDE to " + - "set at least one key column if all specified columns are numeric types")) - } - } - } - - test("create carbon table with repeated table properties") { - try { - sql("drop table if exists carbontable") - sql( - """ - CREATE TABLE IF NOT EXISTS carbontable - (ID Int, date Timestamp, country String, - name String, phonetype String, serialname String, salary Int) - STORED BY 'carbondata' - TBLPROPERTIES('DICTIONARY_EXCLUDE'='country','DICTIONARY_INCLUDE'='ID', - 'DICTIONARY_EXCLUDE'='phonetype', 'DICTIONARY_INCLUDE'='salary') - """) - assert(false) - } catch { - case e : MalformedCarbonCommandException => { - assert(e.getMessage.equals("Table properties is repeated: dictionary_include,dictionary_exclude")) - } - } - } - - override def afterAll { - sql("drop table if exists hivetable") - sql("drop table if exists carbontable") - sql("drop table if exists hiveRenamedTable") - sql("drop table if exists carbonRenamedTable") - sql("drop table if exists carbontablePrecision") - } -} \ No newline at end of file
