http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 600519f..4c7c460 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors.attachTree
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
-import org.apache.spark.sql.hive.CarbonMetastoreCatalog
+import org.apache.spark.sql.hive.{CarbonMetastoreCatalog, CarbonMetastoreTypes}
 import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, 
CarbonDecoderRelation}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -54,20 +54,19 @@ case class CarbonDictionaryDecoder(
     child.output.map { a =>
       val attr = aliasMap.getOrElse(a, a)
       val relation = relations.find(p => p.contains(attr))
-      if(relation.isDefined) {
+      if(relation.isDefined && canBeDecoded(attr)) {
         val carbonTable = 
relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension = carbonTable
           .getDimensionByName(carbonTable.getFactTableName, attr.name)
         if (carbonDimension != null &&
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
-            canBeDecoded(attr)) {
+            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
           val newAttr = AttributeReference(a.name,
-            convertCarbonToSparkDataType(carbonDimension),
+            convertCarbonToSparkDataType(carbonDimension,
+              relation.get.carbonRelation.carbonRelation),
             a.nullable,
             a.metadata)(a.exprId,
             a.qualifiers).asInstanceOf[Attribute]
-          newAttr.resolved
           newAttr
         } else {
           a
@@ -89,15 +88,29 @@ case class CarbonDictionaryDecoder(
     }
   }
 
-  def convertCarbonToSparkDataType(carbonDimension: CarbonDimension): 
types.DataType = {
+  def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
+      relation: CarbonRelation): types.DataType = {
     carbonDimension.getDataType match {
       case DataType.STRING => StringType
       case DataType.INT => IntegerType
       case DataType.LONG => LongType
       case DataType.DOUBLE => DoubleType
       case DataType.BOOLEAN => BooleanType
-      case DataType.DECIMAL => DecimalType.DoubleDecimal
+      case DataType.DECIMAL =>
+        val scale: Int = carbonDimension.getColumnSchema.getScale
+        val precision: Int = carbonDimension.getColumnSchema.getPrecision
+        if (scale > 0 && precision > 0)  {
+          DecimalType(scale, precision)
+        } else {
+          DecimalType(18, 2)
+        }
       case DataType.TIMESTAMP => TimestampType
+      case DataType.STRUCT =>
+        CarbonMetastoreTypes
+        .toDataType(s"struct<${ 
relation.getStructChildren(carbonDimension.getColName) }>")
+      case DataType.ARRAY =>
+        CarbonMetastoreTypes
+        .toDataType(s"array<${ 
relation.getArrayChildren(carbonDimension.getColName) }>")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
index cb20246..ba4c37e 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
@@ -26,24 +26,20 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.execution.LeafNode
 import org.apache.spark.sql.hive.CarbonMetastoreCatalog
-import org.apache.spark.sql.types.{DataType, Decimal}
 import org.apache.spark.unsafe.types.UTF8String
 
 import org.carbondata.core.constants.CarbonCommonConstants
 import org.carbondata.core.util.CarbonProperties
 import org.carbondata.query.carbon.model._
-import org.carbondata.spark.{CarbonFilters, RawKey, RawKeyImpl}
+import org.carbondata.spark.{CarbonFilters, RawValue, RawValueImpl}
 import org.carbondata.spark.rdd.CarbonScanRDD
 
 case class CarbonScan(
     var attributesRaw: Seq[Attribute],
     relationRaw: CarbonRelation,
-    dimensionPredicatesRaw: Seq[Expression],
-    aggExprsRaw: Option[Seq[Expression]],
-    useBinaryAggregator: Boolean)(@transient val ocRaw: SQLContext) extends 
LeafNode {
+    dimensionPredicatesRaw: Seq[Expression])(@transient val ocRaw: SQLContext) 
extends LeafNode {
   val carbonTable = relationRaw.metaData.carbonTable
   val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
   val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
@@ -95,24 +91,6 @@ case class CarbonScan(
           }
         }
       }
-    // Just find out that any aggregation functions are present on dimensions.
-    aggExprsRaw match {
-      case Some(aggExprs) =>
-        aggExprs.foreach {
-          case Alias(agg: AggregateExpression, name) =>
-            agg.collect {
-              case attr: AttributeReference =>
-                val dims = selectedDims.filter(m => 
m.getColumnName.equalsIgnoreCase(attr.name))
-                if(dims.nonEmpty) {
-                  plan.addAggDimAggInfo(dims.head.getColumnName,
-                    dims.head.getAggregateFunction,
-                    dims.head.getQueryOrder)
-                }
-            }
-          case _ =>
-        }
-      case _ =>
-    }
 
     // Fill the selected dimensions & measures obtained from
     // attributes to query plan  for detailed query
@@ -179,14 +157,13 @@ case class CarbonScan(
   }
 
 
-  def inputRdd: CarbonScanRDD[Array[Any], Any] = {
+  def inputRdd: CarbonScanRDD[Array[Any]] = {
 
     val conf = new Configuration()
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-    buildCarbonPlan.getDimAggregatorInfos.clear()
     val model = QueryModel.createModel(
       absoluteTableIdentifier, buildCarbonPlan, carbonTable)
-    val kv: RawKey[Array[Any], Any] = new RawKeyImpl()
+    val kv: RawValue[Array[Any]] = new RawValueImpl
     // setting queryid
     buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + 
""))
 
@@ -224,9 +201,9 @@ case class CarbonScan(
 
         override def next(): InternalRow =
           if (outUnsafeRows) {
-            unsafeProjection(new GenericMutableRow(iter.next()._1.map(toType)))
+            unsafeProjection(new GenericMutableRow(iter.next().map(toType)))
           } else {
-            new GenericMutableRow(iter.next()._1.map(toType))
+            new GenericMutableRow(iter.next().map(toType))
           }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala
index 96a86a7..6ed8c0d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala
@@ -20,13 +20,6 @@ package org.apache.spark.sql
 import org.apache.spark.sql.SQLConf.SQLConfEntry
 import org.apache.spark.sql.hive.CarbonSQLDialect
 
-object CarbonSQLConf {
-
-  val PUSH_COMPUTATION = 
SQLConfEntry.booleanConf("spark.sql.carbon.push.computation",
-    defaultValue = Some(true))
-
-}
-
  /**
   * A trait that enables the setting and getting of mutable config 
parameters/hints.
   *
@@ -40,8 +33,4 @@ class CarbonSQLConf extends SQLConf {
 
   override def caseSensitiveAnalysis: Boolean = 
getConf(SQLConf.CASE_SENSITIVE, false)
 
-  import CarbonSQLConf._
-
-  private[sql] def pushComputation: Boolean = getConf(PUSH_COMPUTATION)
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index a72aaa1..9239cec 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -170,7 +170,7 @@ class CarbonSqlParser()
     initLexical
     phrase(start)(new lexical.Scanner(input)) match {
       case Success(plan, _) => plan match {
-        case x: LoadCube =>
+        case x: LoadTable =>
           x.inputSqlString = input
           x
         case logicalPlan => logicalPlan
@@ -969,7 +969,7 @@ class CarbonSqlParser()
 
         }
         val patitionOptionsMap = partionDataOptions.toMap
-        LoadCube(schema, cubename, filePath, dimFolderPath.getOrElse(Seq()),
+        LoadTable(schema, cubename, filePath, dimFolderPath.getOrElse(Seq()),
             patitionOptionsMap, false)
     }
 
@@ -985,7 +985,7 @@ class CarbonSqlParser()
             validateOptions(partionDataOptions)
           }
           val patitionOptionsMap = 
partionDataOptions.getOrElse(List.empty[(String, String)]).toMap
-          LoadCube(schema, cubename, filePath, Seq(), patitionOptionsMap, 
isOverwrite.isDefined)
+          LoadTable(schema, cubename, filePath, Seq(), patitionOptionsMap, 
isOverwrite.isDefined)
       }
 
   private def validateOptions(partionDataOptions: Option[List[(String, 
String)]]): Unit = {
@@ -1079,9 +1079,7 @@ class CarbonSqlParser()
       opt(";") ^^ {
       case tabletype ~ exists ~ schemaName ~ resourceName =>
         tabletype match {
-          case agg ~ table =>
-            DropAggregateTableCommand(exists.isDefined, schemaName, 
resourceName.toLowerCase())
-          case _ => DropCubeCommand(exists.isDefined, schemaName, 
resourceName.toLowerCase())
+          case _ => DropTableCommand(exists.isDefined, schemaName, 
resourceName.toLowerCase())
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index fcc2e8a..e36d148 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -1381,7 +1381,7 @@ private[sql] case class DeleteLoadsByLoadDate(
 
 }
 
-private[sql] case class LoadCube(
+private[sql] case class LoadTable(
     schemaNameOp: Option[String],
     tableName: String,
     factPathFromUser: String,
@@ -1693,7 +1693,7 @@ private[sql] case class MergeTable(dbName: String, 
cubeName: String, tableName:
   }
 }
 
-private[sql] case class DropCubeCommand(ifExistsSet: Boolean, schemaNameOp: 
Option[String],
+private[sql] case class DropTableCommand(ifExistsSet: Boolean, schemaNameOp: 
Option[String],
     tableName: String)
   extends RunnableCommand {
 
@@ -1755,7 +1755,7 @@ private[sql] case class DropCubeCommand(ifExistsSet: 
Boolean, schemaNameOp: Opti
                 Some(relation.cubeMeta.carbonTableIdentifier.getDatabaseName))
               )(sqlContext)
           CarbonDataRDDFactory
-            .dropCube(sqlContext.sparkContext, dbName, tableName,
+            .dropTable(sqlContext.sparkContext, dbName, tableName,
               relation.cubeMeta.partitioner)
           QueryPartitionHelper.getInstance().removePartition(dbName, tableName)
 
@@ -1789,33 +1789,6 @@ private[sql] case class DropCubeCommand(ifExistsSet: 
Boolean, schemaNameOp: Opti
   }
 }
 
-private[sql] case class DropAggregateTableCommand(ifExistsSet: Boolean,
-    schemaNameOp: Option[String],
-    tableName: String) extends RunnableCommand {
-
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
-    val identifier = TableIdentifier(tableName, Option(dbName))
-    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
-      .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
-
-    if (relation == null) {
-      if (!ifExistsSet) {
-        sys.error(s"Aggregate Table $dbName.$tableName does not exist")
-      }
-    }
-    else {
-      CarbonDataRDDFactory.dropAggregateTable(
-        sqlContext.sparkContext,
-        dbName,
-        tableName,
-        relation.cubeMeta.partitioner)
-    }
-
-    Seq.empty
-  }
-}
-
 private[sql] case class ShowLoads(
     schemaNameOp: Option[String],
     tableName: String,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 829c487..76edc11 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -33,7 +33,6 @@ import 
org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, DropTable,
 import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, 
CarbonDecoderRelation}
 
 import org.carbondata.common.logging.LogServiceFactory
-import org.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.carbondata.spark.exception.MalformedCarbonCommandException
 
 
@@ -64,10 +63,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends 
QueryPlanner[SparkPlan] {
             carbonRawScan(projectList,
               predicates,
               carbonRelation,
-              l,
-              None,
-              detailQuery = true,
-              useBinaryAggregation = false)(sqlContext)._1 :: Nil
+              l)(sqlContext) :: Nil
           }
         case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, 
child) =>
           CarbonDictionaryDecoder(relations,
@@ -85,10 +81,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends 
QueryPlanner[SparkPlan] {
     private def carbonRawScan(projectList: Seq[NamedExpression],
       predicates: Seq[Expression],
       relation: CarbonDatasourceRelation,
-      logicalRelation: LogicalRelation,
-      groupExprs: Option[Seq[Expression]],
-      detailQuery: Boolean,
-      useBinaryAggregation: Boolean)(sc: SQLContext): (SparkPlan, Boolean) = {
+      logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
 
       val tableName: String =
         
relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
@@ -97,49 +90,32 @@ class CarbonStrategies(sqlContext: SQLContext) extends 
QueryPlanner[SparkPlan] {
       val projectSet = AttributeSet(projectList.flatMap(_.references))
       val scan = CarbonScan(projectSet.toSeq,
         relation.carbonRelation,
-        predicates,
-        groupExprs,
-        useBinaryAggregation)(sqlContext)
-      val dimAggrsPresence: Boolean = 
scan.buildCarbonPlan.getDimAggregatorInfos.size() > 0
+        predicates)(sqlContext)
       projectList.map {
         case attr: AttributeReference =>
         case Alias(attr: AttributeReference, _) =>
         case others =>
-          others.references
-              .map(f => 
scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference]))
-      }
-      if (!detailQuery) {
-        if (scan.attributesNeedToDecode.size > 0) {
-          val decoder = getCarbonDecoder(logicalRelation,
-            sc,
-            tableName,
-            scan.attributesNeedToDecode.asScala.toSeq,
-            scan)
-          if (scan.unprocessedExprs.nonEmpty) {
-            val filterCondToAdd = 
scan.unprocessedExprs.reduceLeftOption(expressions.And)
-            (Project(projectList, filterCondToAdd.map(Filter(_, 
decoder)).getOrElse(decoder)), true)
-          } else {
-            (Project(projectList, decoder), true)
+          others.references.map{f =>
+            val dictionary = 
relation.carbonRelation.metaData.dictionaryMap.get(f.name)
+            if (dictionary.isDefined && dictionary.get) {
+              
scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference])
+            }
           }
+      }
+      if (scan.attributesNeedToDecode.size() > 0) {
+        val decoder = getCarbonDecoder(logicalRelation,
+          sc,
+          tableName,
+          scan.attributesNeedToDecode.asScala.toSeq,
+          scan)
+        if (scan.unprocessedExprs.nonEmpty) {
+          val filterCondToAdd = 
scan.unprocessedExprs.reduceLeftOption(expressions.And)
+          Project(projectList, filterCondToAdd.map(Filter(_, 
decoder)).getOrElse(decoder))
         } else {
-          (scan, dimAggrsPresence)
+          Project(projectList, decoder)
         }
       } else {
-        if (scan.attributesNeedToDecode.size() > 0) {
-          val decoder = getCarbonDecoder(logicalRelation,
-            sc,
-            tableName,
-            scan.attributesNeedToDecode.asScala.toSeq,
-            scan)
-          if (scan.unprocessedExprs.nonEmpty) {
-            val filterCondToAdd = 
scan.unprocessedExprs.reduceLeftOption(expressions.And)
-            (Project(projectList, filterCondToAdd.map(Filter(_, 
decoder)).getOrElse(decoder)), true)
-          } else {
-            (Project(projectList, decoder), true)
-          }
-        } else {
-          (Project(projectList, scan), dimAggrsPresence)
-        }
+        Project(projectList, scan)
       }
     }
 
@@ -158,9 +134,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends 
QueryPlanner[SparkPlan] {
       val projectExprsNeedToDecode = new java.util.HashSet[Attribute]()
       val scan = CarbonScan(projectList.map(_.toAttribute),
         relation.carbonRelation,
-        predicates,
-        None,
-        useBinaryAggregator = false)(sqlContext)
+        predicates)(sqlContext)
       projectExprsNeedToDecode.addAll(scan.attributesNeedToDecode)
       if (projectExprsNeedToDecode.size() > 0) {
         val decoder = getCarbonDecoder(logicalRelation,
@@ -206,18 +180,6 @@ class CarbonStrategies(sqlContext: SQLContext) extends 
QueryPlanner[SparkPlan] {
         case _ => false
       }
     }
-
-    private def isGroupByPresentOnMeasures(groupingExpressions: 
Seq[Expression],
-      carbonTable: CarbonTable): Boolean = {
-      groupingExpressions.map { g =>
-        g.collect {
-          case attr: AttributeReference
-            if carbonTable.getMeasureByName(carbonTable.getFactTableName, 
attr.name) != null =>
-            return true
-        }
-      }
-      false
-    }
   }
 
   object DDLStrategies extends Strategy {
@@ -233,17 +195,17 @@ class CarbonStrategies(sqlContext: SQLContext) extends 
QueryPlanner[SparkPlan] {
       case DropTable(tableName, ifNotExists)
         if CarbonEnv.getInstance(sqlContext).carbonCatalog
             .tableExists(TableIdentifier(tableName.toLowerCase))(sqlContext) =>
-        ExecutedCommand(DropCubeCommand(ifNotExists, None, 
tableName.toLowerCase)) :: Nil
+        ExecutedCommand(DropTableCommand(ifNotExists, None, 
tableName.toLowerCase)) :: Nil
       case ShowAggregateTablesCommand(schemaName) =>
         ExecutedCommand(ShowAggregateTables(schemaName, plan.output)) :: Nil
       case ShowLoadsCommand(schemaName, cube, limit) =>
         ExecutedCommand(ShowLoads(schemaName, cube, limit, plan.output)) :: Nil
-      case LoadCube(schemaNameOp, cubeName, factPathFromUser, dimFilesPath,
+      case LoadTable(schemaNameOp, cubeName, factPathFromUser, dimFilesPath,
       partionValues, isOverwriteExist, inputSqlString) =>
         val isCarbonTable = CarbonEnv.getInstance(sqlContext).carbonCatalog
             .tableExists(TableIdentifier(cubeName, schemaNameOp))(sqlContext)
         if (isCarbonTable || partionValues.nonEmpty) {
-          ExecutedCommand(LoadCube(schemaNameOp, cubeName, factPathFromUser,
+          ExecutedCommand(LoadTable(schemaNameOp, cubeName, factPathFromUser,
             dimFilesPath, partionValues, isOverwriteExist, inputSqlString)) :: 
Nil
         } else {
           ExecutedCommand(HiveNativeCommand(inputSqlString)) :: Nil

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
index 03bb23e..0f583d0 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
@@ -55,17 +55,13 @@ class CarbonDecoderProcessor {
       case cd: CarbonDictionaryTempDecoder =>
         nodeList.add(Node(cd))
         process(cd.child, nodeList)
-      case j: Join =>
+      case j: BinaryNode =>
         val leftList = new util.ArrayList[AbstractNode]
         val rightList = new util.ArrayList[AbstractNode]
         nodeList.add(JoinNode(leftList, rightList))
         process(j.left, leftList)
         process(j.right, rightList)
-      case p: Project => process(p.child, nodeList)
-      case f: Filter => process(f.child, nodeList)
-      case s: Sort => process(s.child, nodeList)
-      case a: Aggregate => process(a.child, nodeList)
-      case l: Limit => process(l.child, nodeList)
+      case e: UnaryNode => process(e.child, nodeList)
       case _ =>
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index d80c065..73cb3d5 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -44,13 +44,9 @@ class CarbonOptimizer(optimizer: Optimizer, conf: 
CatalystConf)
 
   override def execute(plan: LogicalPlan): LogicalPlan = {
     val executedPlan: LogicalPlan = optimizer.execute(plan)
-    if (!conf.asInstanceOf[CarbonSQLConf].pushComputation) {
-      val relations = collectCarbonRelation(plan)
-      if (relations.nonEmpty) {
-        new ResolveCarbonFunctions(relations)(executedPlan)
-      } else {
-        executedPlan
-      }
+    val relations = collectCarbonRelation(plan)
+    if (relations.nonEmpty) {
+      new ResolveCarbonFunctions(relations)(executedPlan)
     } else {
       executedPlan
     }
@@ -111,19 +107,28 @@ class CarbonOptimizer(optimizer: Optimizer, conf: 
CatalystConf)
 
           case agg: Aggregate if 
!agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
             val attrsOndimAggs = new util.HashSet[Attribute]
-            agg.aggregateExpressions.map { aggExp =>
-              aggExp.transform {
-                case aggExp: AggregateExpression =>
-                  collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap)
-                  aggExp
-                case a@Alias(attr: Attribute, name) =>
-                  aliasMap.put(a.toAttribute, attr)
-                  a
-              }
+            agg.aggregateExpressions.map {
+              case attr: AttributeReference =>
+              case a@Alias(attr: AttributeReference, name) => 
aliasMap.put(a.toAttribute, attr)
+              case aggExp: AggregateExpression =>
+                aggExp.transform {
+                  case aggExp: AggregateExpression =>
+                    collectDimensionAggregates(aggExp, attrsOndimAggs, 
aliasMap)
+                    aggExp
+                  case a@Alias(attr: Attribute, name) =>
+                    aliasMap.put(a.toAttribute, attr)
+                    a
+                }
+              case others =>
+                others.collect {
+                  case attr: AttributeReference
+                    if isDictionaryEncoded(attr, relations, aliasMap) =>
+                    attrsOndimAggs.add(aliasMap.getOrElse(attr, attr))
+                }
             }
             var child = agg.child
             // Incase if the child also aggregate then push down decoder to 
child
-            if (attrsOndimAggs.size() > 0 && !(child.equals(agg))) {
+            if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
               child = CarbonDictionaryTempDecoder(attrsOndimAggs,
                 new util.HashSet[Attribute](),
                 agg.child)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala 
b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
index aee375a..cf31a7b 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
@@ -105,14 +105,19 @@ object CarbonFilters {
   def selectFilters(filters: Seq[Expression],
       attrList: java.util.HashSet[Attribute],
       aliasMap: CarbonAliasDecoderRelation): Unit = {
-    def translate(expr: Expression): Option[sources.Filter] = {
+    def translate(expr: Expression, or: Boolean = false): 
Option[sources.Filter] = {
       expr match {
-        case Or(left, right) =>
-          for {
-            leftFilter <- translate(left)
-            rightFilter <- translate(right)
-          } yield {
-            sources.Or(leftFilter, rightFilter)
+        case or@ Or(left, right) =>
+
+          val leftFilter = translate(left, true)
+          val rightFilter = translate(right, true)
+          if (leftFilter.isDefined && rightFilter.isDefined) {
+            Some( sources.Or(leftFilter.get, rightFilter.get))
+          } else {
+            or.collect {
+              case attr: AttributeReference => 
attrList.add(aliasMap.getOrElse(attr, attr))
+            }
+            None
           }
 
         case And(left, right) =>
@@ -151,28 +156,35 @@ object CarbonFilters {
           Some(sources.In(a.name, hSet.toArray))
 
         case others =>
-          others.collect {
-            case attr: AttributeReference =>
-              attrList.add(aliasMap.getOrElse(attr, attr))
+          if (!or) {
+            others.collect {
+              case attr: AttributeReference =>
+                attrList.add(aliasMap.getOrElse(attr, attr))
+            }
           }
           None
       }
     }
-    filters.flatMap(translate).toArray
+    filters.flatMap(translate(_, false)).toArray
   }
 
   def processExpression(exprs: Seq[Expression],
       attributesNeedToDecode: java.util.HashSet[AttributeReference],
       unprocessedExprs: ArrayBuffer[Expression],
       carbonTable: CarbonTable): Option[CarbonExpression] = {
-    def transformExpression(expr: Expression): Option[CarbonExpression] = {
+    def transformExpression(expr: Expression, or: Boolean = false): 
Option[CarbonExpression] = {
       expr match {
-        case Or(left, right) =>
-          for {
-            leftFilter <- transformExpression(left)
-            rightFilter <- transformExpression(right)
-          } yield {
-            new OrExpression(leftFilter, rightFilter)
+        case or@ Or(left, right) =>
+          val leftFilter = transformExpression(left, true)
+          val rightFilter = transformExpression(right, true)
+          if (leftFilter.isDefined && rightFilter.isDefined) {
+            Some(new OrExpression(leftFilter.get, rightFilter.get))
+          } else {
+            or.collect {
+              case attr: AttributeReference => attributesNeedToDecode.add(attr)
+            }
+            unprocessedExprs += or
+            None
           }
 
         case And(left, right) =>
@@ -220,14 +232,16 @@ object CarbonFilters {
             CarbonLiteralExpression(name, 
CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
         case Cast(left, right) if !left.isInstanceOf[Literal] => 
transformExpression(left)
         case others =>
-          others.collect {
-            case attr: AttributeReference => attributesNeedToDecode.add(attr)
+          if (!or) {
+            others.collect {
+              case attr: AttributeReference => attributesNeedToDecode.add(attr)
+            }
+            unprocessedExprs += others
           }
-          unprocessedExprs += others
           None
       }
     }
-    exprs.flatMap(transformExpression).reduceOption(new AndExpression(_, _))
+    exprs.flatMap(transformExpression(_, false)).reduceOption(new 
AndExpression(_, _))
   }
 
   private def getActualCarbonDataType(column: String, carbonTable: 
CarbonTable) = {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala 
b/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala
index 6ee882b..cb87818 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala
@@ -26,35 +26,23 @@
 package org.carbondata.spark
 
 import org.carbondata.core.load.LoadMetadataDetails
-import org.carbondata.query.carbon.result.BatchRawResult
-import org.carbondata.query.scanner.impl.{CarbonKey, CarbonValue}
 
-trait KeyVal[K, V] extends Serializable {
-  def getKey(key: CarbonKey, value: CarbonValue): (K, V)
-
-}
-
-class KeyValImpl extends KeyVal[CarbonKey, CarbonValue] {
-  override def getKey(key: CarbonKey, value: CarbonValue): (CarbonKey, 
CarbonValue) = (key, value)
+trait Value[V] extends Serializable {
+  def getValue(value: Array[Object]): V
 }
 
-trait RawKeyVal[K, V] extends Serializable {
-  def getKey(key: BatchRawResult, value: Any): (K, V)
-
+class ValueImpl extends Value[Array[Object]] {
+  override def getValue(value: Array[Object]): Array[Object] = value
 }
 
-class RawKeyValImpl extends RawKeyVal[BatchRawResult, Any] {
-  override def getKey(key: BatchRawResult, value: Any): (BatchRawResult, Any) 
= (key, value)
+trait RawValue[V] extends Serializable {
+  def getValue(value: Array[Any]): V
 }
 
-trait RawKey[K, V] extends Serializable {
-  def getKey(key: Array[Any], value: Any): (K, V)
-
+class RawValueImpl extends RawValue[Array[Any]] {
+  override def getValue(value: Array[Any]): Array[Any] = value
 }
 
-class RawKeyImpl extends RawKey[Array[Any], Any] {
-  override def getKey(key: Array[Any], value: Any): (Array[Any], Any) = (key, 
value)
-}
 trait Result[K, V] extends Serializable {
   def getKey(key: Int, value: LoadMetadataDetails): (K, V)
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
index 4e71be1..17d83b2 100644
--- 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -18,23 +18,23 @@
 package org.carbondata.spark.rdd
 
 import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
 
 import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 
-import org.carbondata.query.scanner.impl.{CarbonKey, CarbonValue}
-import org.carbondata.spark.KeyVal
+import org.carbondata.spark.Value
 import org.carbondata.spark.util.CarbonQueryUtil
 
 
-class CarbonCleanFilesRDD[K, V](
+class CarbonCleanFilesRDD[V: ClassTag](
     sc: SparkContext,
-    keyClass: KeyVal[K, V],
+    valueClass: Value[V],
     schemaName: String,
     cubeName: String,
     partitioner: Partitioner)
-  extends RDD[(K, V)](sc, Nil) with Logging {
+  extends RDD[V](sc, Nil) with Logging {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
@@ -43,8 +43,8 @@ class CarbonCleanFilesRDD[K, V](
     splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1))
   }
 
-  override def compute(theSplit: Partition, context: TaskContext): 
Iterator[(K, V)] = {
-    val iter = new Iterator[(K, V)] {
+  override def compute(theSplit: Partition, context: TaskContext): Iterator[V] 
= {
+    val iter = new Iterator[(V)] {
       val split = theSplit.asInstanceOf[CarbonLoadPartition]
       logInfo("Input split: " + split.serializableHadoopSplit.value)
       // TODO call CARBON delete API
@@ -61,14 +61,12 @@ class CarbonCleanFilesRDD[K, V](
         !finished
       }
 
-      override def next(): (K, V) = {
+      override def next(): V = {
         if (!hasNext) {
           throw new java.util.NoSuchElementException("End of stream")
         }
         havePair = false
-        val row = new CarbonKey(null)
-        val value = new CarbonValue(null)
-        keyClass.getKey(row, value)
+        valueClass.getValue(null)
       }
 
     }
@@ -78,7 +76,7 @@ class CarbonCleanFilesRDD[K, V](
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val theSplit = split.asInstanceOf[CarbonLoadPartition]
     val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name : " + s(0) + s.length)
+    logInfo("Host Name : " + s.head + s.length)
     s
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0672281..af2271f 100644
--- 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -29,23 +29,22 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileSplit
 import org.apache.spark.{Logging, Partition, SparkContext}
-import org.apache.spark.sql.{CarbonEnv, CarbonRelation, SQLContext}
+import org.apache.spark.sql.{CarbonEnv, SQLContext}
 import org.apache.spark.sql.execution.command.{AlterTableModel, 
CompactionModel, Partitioner}
 import org.apache.spark.util.{FileUtils, SplitUtils}
 
 import org.carbondata.common.logging.LogServiceFactory
-import org.carbondata.core.carbon.{AbsoluteTableIdentifier, 
CarbonDataLoadSchema, CarbonTableIdentifier}
+import org.carbondata.core.carbon.CarbonDataLoadSchema
 import org.carbondata.core.carbon.datastore.block.TableBlockInfo
 import org.carbondata.core.carbon.metadata.CarbonMetadata
 import org.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.carbondata.core.constants.CarbonCommonConstants
 import org.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.carbondata.core.locks.{CarbonLockFactory, LockUsage}
-import org.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.carbondata.core.util.CarbonUtil
 import org.carbondata.integration.spark.merger.CompactionType
 import org.carbondata.lcm.status.SegmentStatusManager
 import org.carbondata.processing.util.CarbonDataProcessorUtil
-import org.carbondata.query.scanner.impl.{CarbonKey, CarbonValue}
 import org.carbondata.spark._
 import org.carbondata.spark.load._
 import org.carbondata.spark.merger.CarbonDataMergerUtil
@@ -129,7 +128,7 @@ object CarbonDataRDDFactory extends Logging {
     if (-1 == currentRestructNumber) {
       currentRestructNumber = 0
     }
-    var segmentStatusManager = new 
SegmentStatusManager(cube.getAbsoluteTableIdentifier)
+    val segmentStatusManager = new 
SegmentStatusManager(cube.getAbsoluteTableIdentifier)
     val loadMetadataDetailsArray = 
segmentStatusManager.readLoadMetadata(cube.getMetaDataFilepath())
       .toList
     val resultMap = new CarbonDeleteLoadByDateRDD(
@@ -408,9 +407,8 @@ object CarbonDataRDDFactory extends Logging {
 
       // Check if any load need to be deleted before loading new data
       deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, partitioner, 
hdfsStoreLocation,
-        false,
-        currentRestructNumber
-      )
+        isForceDeletion = false,
+        currentRestructNumber)
       if (null == carbonLoadModel.getLoadMetadataDetails) {
         readLoadMetadataDetails(carbonLoadModel, hdfsStoreLocation)
       }
@@ -668,9 +666,9 @@ object CarbonDataRDDFactory extends Logging {
 
   def readLoadMetadataDetails(model: CarbonLoadModel, hdfsStoreLocation: 
String): Unit = {
     val metadataPath = 
model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath
-    var segmentStatusManager = new 
SegmentStatusManager(model.getCarbonDataLoadSchema.getCarbonTable
-      .
-        getAbsoluteTableIdentifier)
+    val segmentStatusManager =
+      new SegmentStatusManager(
+        
model.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier)
     val details = segmentStatusManager.readLoadMetadata(metadataPath)
     model.setLoadMetadataDetails(details.toList.asJava)
   }
@@ -704,22 +702,13 @@ object CarbonDataRDDFactory extends Logging {
     }
   }
 
-  def dropAggregateTable(
+  def dropTable(
       sc: SparkContext,
       schema: String,
       cube: String,
       partitioner: Partitioner) {
-    val kv: KeyVal[CarbonKey, CarbonValue] = new KeyValImpl()
-    new CarbonDropAggregateTableRDD(sc, kv, schema, cube, partitioner).collect
-  }
-
-  def dropCube(
-      sc: SparkContext,
-      schema: String,
-      cube: String,
-      partitioner: Partitioner) {
-    val kv: KeyVal[CarbonKey, CarbonValue] = new KeyValImpl()
-    new CarbonDropCubeRDD(sc, kv, schema, cube, partitioner).collect
+    val v: Value[Array[Object]] = new ValueImpl()
+    new CarbonDropTableRDD(sc, v, schema, cube, partitioner).collect
   }
 
   def cleanFiles(
@@ -735,7 +724,7 @@ object CarbonDataRDDFactory extends Logging {
     if (-1 == currentRestructNumber) {
       currentRestructNumber = 0
     }
-    var carbonLock = CarbonLockFactory
+    val carbonLock = CarbonLockFactory
       .getCarbonLockObj(cube.getMetaDataFilepath, LockUsage.METADATA_LOCK)
     try {
       if (carbonLock.lockWithRetries()) {
@@ -744,8 +733,7 @@ object CarbonDataRDDFactory extends Logging {
           partitioner,
           hdfsStoreLocation,
           isForceDeletion = true,
-          currentRestructNumber
-        )
+          currentRestructNumber)
       }
     }
     finally {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
index 9fc5f9e..4616ca9 100644
--- 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -18,36 +18,34 @@
 package org.carbondata.spark.rdd
 
 import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
 
 import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 
-import org.carbondata.query.scanner.impl.{CarbonKey, CarbonValue}
-import org.carbondata.spark.KeyVal
+import org.carbondata.spark.Value
 import org.carbondata.spark.util.CarbonQueryUtil
 
-class CarbonDeleteLoadRDD[K, V](
-                                 sc: SparkContext,
-                                 keyClass: KeyVal[K, V],
-                                 loadId: Int,
-                                 schemaName: String,
-                                 cubeName: String,
-                                 partitioner: Partitioner)
-  extends RDD[(K, V)](sc, Nil) with Logging {
+class CarbonDeleteLoadRDD[V: ClassTag](
+    sc: SparkContext,
+    valueClass: Value[V],
+    loadId: Int,
+    schemaName: String,
+    cubeName: String,
+    partitioner: Partitioner)
+  extends RDD[V](sc, Nil) with Logging {
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(schemaName, cubeName, null, 
partitioner)
-    val result = new Array[Partition](splits.length)
-    for (i <- 0 until result.length) {
-      result(i) = new CarbonLoadPartition(id, i, splits(i))
+    splits.zipWithIndex.map {f =>
+      new CarbonLoadPartition(id, f._2, f._1)
     }
-    result
   }
 
-  override def compute(theSplit: Partition, context: TaskContext): 
Iterator[(K, V)] = {
-    val iter = new Iterator[(K, V)] {
+  override def compute(theSplit: Partition, context: TaskContext): Iterator[V] 
= {
+    val iter = new Iterator[V] {
       val split = theSplit.asInstanceOf[CarbonLoadPartition]
       logInfo("Input split: " + split.serializableHadoopSplit.value)
       // TODO call CARBON delete API
@@ -57,20 +55,18 @@ class CarbonDeleteLoadRDD[K, V](
 
       override def hasNext: Boolean = {
         if (!finished && !havePair) {
-          finished = !false
+          finished = true
           havePair = !finished
         }
         !finished
       }
 
-      override def next(): (K, V) = {
+      override def next(): V = {
         if (!hasNext) {
           throw new java.util.NoSuchElementException("End of stream")
         }
         havePair = false
-        val row = new CarbonKey(null)
-        val value = new CarbonValue(null)
-        keyClass.getKey(row, value)
+        valueClass.getValue(null)
       }
 
     }
@@ -81,7 +77,7 @@ class CarbonDeleteLoadRDD[K, V](
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val theSplit = split.asInstanceOf[CarbonLoadPartition]
     val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name : " + s(0) + s.length)
+    logInfo("Host Name : " + s.head + s.length)
     s
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropAggregateTableRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropAggregateTableRDD.scala
 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropAggregateTableRDD.scala
deleted file mode 100644
index 1b57890..0000000
--- 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropAggregateTableRDD.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.carbondata.query.scanner.impl.{CarbonKey, CarbonValue}
-import org.carbondata.spark.KeyVal
-import org.carbondata.spark.util.CarbonQueryUtil
-
-
-class CarbonDropAggregateTableRDD[K, V](
-    sc: SparkContext,
-    keyClass: KeyVal[K, V],
-    schemaName: String,
-    cubeName: String,
-    partitioner: Partitioner)
-  extends RDD[(K, V)](sc, Nil) with Logging {
-
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
-  override def getPartitions: Array[Partition] = {
-    val splits = CarbonQueryUtil.getTableSplits(schemaName, cubeName, null, 
partitioner)
-    splits.zipWithIndex.map { s =>
-      new CarbonLoadPartition(id, s._2, s._1)
-    }
-  }
-
-  override def compute(theSplit: Partition, context: TaskContext): 
Iterator[(K, V)] = {
-    val iter = new Iterator[(K, V)] {
-      val split = theSplit.asInstanceOf[CarbonLoadPartition]
-      logInfo("Input split: " + split.serializableHadoopSplit.value)
-      // TODO call CARBON delete API
-
-      var havePair = false
-      var finished = false
-
-      override def hasNext: Boolean = {
-        if (!finished && !havePair) {
-          finished = true
-          havePair = !finished
-        }
-        !finished
-      }
-
-      override def next(): (K, V) = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        val row = new CarbonKey(null)
-        val value = new CarbonValue(null)
-        keyClass.getKey(row, value)
-      }
-    }
-    iter
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    val theSplit = split.asInstanceOf[CarbonLoadPartition]
-    val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name : " + s.head + s.length)
-    s
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropCubeRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropCubeRDD.scala
 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropCubeRDD.scala
deleted file mode 100644
index d0bc5d1..0000000
--- 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropCubeRDD.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.carbondata.spark.rdd
-
-import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.carbondata.query.scanner.impl.{CarbonKey, CarbonValue}
-import org.carbondata.spark.KeyVal
-import org.carbondata.spark.util.CarbonQueryUtil
-
-class CarbonDropCubeRDD[K, V](
-    sc: SparkContext,
-    keyClass: KeyVal[K, V],
-    schemaName: String,
-    cubeName: String,
-    partitioner: Partitioner)
-  extends RDD[(K, V)](sc, Nil) with Logging {
-
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
-  override def getPartitions: Array[Partition] = {
-    val splits = CarbonQueryUtil.getTableSplits(schemaName, cubeName, null, 
partitioner)
-    splits.zipWithIndex.map { s =>
-      new CarbonLoadPartition(id, s._2, s._1)
-    }
-  }
-
-  override def compute(theSplit: Partition, context: TaskContext): 
Iterator[(K, V)] = {
-
-    val iter = new Iterator[(K, V)] {
-      val split = theSplit.asInstanceOf[CarbonLoadPartition]
-
-      val partitionCount = partitioner.partitionCount
-      // TODO: Clear Btree from memory
-
-      var havePair = false
-      var finished = false
-
-      override def hasNext: Boolean = {
-        if (!finished && !havePair) {
-          finished = true
-          havePair = !finished
-        }
-        !finished
-      }
-
-      override def next(): (K, V) = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        val row = new CarbonKey(null)
-        val value = new CarbonValue(null)
-        keyClass.getKey(row, value)
-      }
-    }
-    iter
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropTableRDD.scala
 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropTableRDD.scala
new file mode 100644
index 0000000..513916c
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropTableRDD.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.carbondata.spark.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+
+import org.carbondata.spark.Value
+import org.carbondata.spark.util.CarbonQueryUtil
+
+class CarbonDropTableRDD[V: ClassTag](
+    sc: SparkContext,
+    valueClass: Value[V],
+    schemaName: String,
+    cubeName: String,
+    partitioner: Partitioner)
+  extends RDD[V](sc, Nil) with Logging {
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  override def getPartitions: Array[Partition] = {
+    val splits = CarbonQueryUtil.getTableSplits(schemaName, cubeName, null, 
partitioner)
+    splits.zipWithIndex.map { s =>
+      new CarbonLoadPartition(id, s._2, s._1)
+    }
+  }
+
+  override def compute(theSplit: Partition, context: TaskContext): Iterator[V] 
= {
+
+    val iter = new Iterator[V] {
+      val split = theSplit.asInstanceOf[CarbonLoadPartition]
+
+      val partitionCount = partitioner.partitionCount
+      // TODO: Clear Btree from memory
+
+      var havePair = false
+      var finished = false
+
+      override def hasNext: Boolean = {
+        if (!finished && !havePair) {
+          finished = true
+          havePair = !finished
+        }
+        !finished
+      }
+
+      override def next(): V = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        havePair = false
+        valueClass.getValue(null)
+      }
+    }
+    iter
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
index 5cb33fa..dd2a10a 100644
--- 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -36,7 +36,6 @@ import org.carbondata.core.util.CarbonProperties
 import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
 import org.carbondata.integration.spark.merger.{CarbonCompactionExecutor, 
CarbonCompactionUtil,
 RowResultMerger}
-import org.carbondata.query.carbon.result.{RowResult}
 import org.carbondata.query.carbon.result.iterator.RawResultIterator
 import org.carbondata.spark.MergeResult
 import org.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel}
@@ -179,7 +178,7 @@ class CarbonMergerRDD[K, V](
     val absoluteTableIdentifier: AbsoluteTableIdentifier = new 
AbsoluteTableIdentifier(
       hdfsStoreLocation, new CarbonTableIdentifier(schemaName, factTableName, 
tableId)
     )
-    val (carbonInputFormat: CarbonInputFormat[RowResult], job: Job) =
+    val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
       QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
     val result = new util.ArrayList[Partition](defaultParallelism)
     val mapsOfNodeBlockMapping: util.List[util.Map[String, 
util.List[TableBlockInfo]]] = new

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
index a95ae27..6693108 100644
--- 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -21,6 +21,7 @@ package org.carbondata.spark.rdd
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.Job
@@ -33,10 +34,10 @@ import org.carbondata.core.iterator.CarbonIterator
 import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
 import org.carbondata.query.carbon.executor.QueryExecutorFactory
 import org.carbondata.query.carbon.model.QueryModel
-import org.carbondata.query.carbon.result.{BatchRawResult, RowResult}
-import org.carbondata.query.carbon.result.iterator.ChunkRawRowIterartor
+import org.carbondata.query.carbon.result.BatchResult
+import org.carbondata.query.carbon.result.iterator.ChunkRowIterator
 import org.carbondata.query.expression.Expression
-import org.carbondata.spark.RawKey
+import org.carbondata.spark.RawValue
 import org.carbondata.spark.load.CarbonLoaderUtil
 import org.carbondata.spark.util.QueryPlanUtil
 
@@ -58,29 +59,29 @@ class CarbonSparkPartition(rddId: Int, val idx: Int,
   * CarbonData file, this RDD will leverage CarbonData's index information to 
do CarbonData file
   * level filtering in driver side.
   */
-class CarbonScanRDD[K, V](
+class CarbonScanRDD[V: ClassTag](
   sc: SparkContext,
   queryModel: QueryModel,
   filterExpression: Expression,
-  keyClass: RawKey[K, V],
+  keyClass: RawValue[V],
   @transient conf: Configuration,
   cubeCreationTime: Long,
   schemaLastUpdatedTime: Long,
   baseStoreLocation: String)
-  extends RDD[(K, V)](sc, Nil) with Logging {
+  extends RDD[V](sc, Nil) with Logging {
 
   val defaultParallelism = sc.defaultParallelism
 
   override def getPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
-    val (carbonInputFormat: CarbonInputFormat[RowResult], job: Job) =
+    val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
       
QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
 
     val result = new util.ArrayList[Partition](defaultParallelism)
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     // set filter resolver tree
     try {
-      var filterResolver = carbonInputFormat
+      val filterResolver = carbonInputFormat
         .getResolvedFilter(job.getConfiguration, filterExpression)
 
       CarbonInputFormat.setFilterPredicates(job.getConfiguration, 
filterResolver)
@@ -147,15 +148,15 @@ class CarbonScanRDD[K, V](
     result.toArray(new Array[Partition](result.size()))
   }
 
-   override def compute(thepartition: Partition, context: TaskContext): 
Iterator[(K, V)] = {
-     val LOGGER = LogServiceFactory.getLogService(this.getClass().getName());
-     val iter = new Iterator[(K, V)] {
+   override def compute(thepartition: Partition, context: TaskContext): 
Iterator[V] = {
+     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+     val iter = new Iterator[V] {
        var rowIterator: CarbonIterator[Array[Any]] = _
        var queryStartTime: Long = 0
        try {
          val carbonSparkPartition = 
thepartition.asInstanceOf[CarbonSparkPartition]
          if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
-           queryModel.setQueryId(queryModel.getQueryId() + "_" + 
carbonSparkPartition.idx)
+           queryModel.setQueryId(queryModel.getQueryId + "_" + 
carbonSparkPartition.idx)
            // fill table block info
            queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
            queryStartTime = System.currentTimeMillis
@@ -164,13 +165,13 @@ class CarbonScanRDD[K, V](
            logInfo("*************************" + carbonPropertiesFilePath)
            if (null == carbonPropertiesFilePath) {
              System.setProperty("carbon.properties.filepath",
-               System.getProperty("user.dir") + '/' + "conf" + '/' + 
"carbon.properties");
+               System.getProperty("user.dir") + '/' + "conf" + '/' + 
"carbon.properties")
            }
            // execute query
-           rowIterator = new ChunkRawRowIterartor(
-             
QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel)
-                 .asInstanceOf[CarbonIterator[BatchRawResult]])
-                 .asInstanceOf[CarbonIterator[Array[Any]]]
+           rowIterator = new ChunkRowIterator(
+             
QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel).
+               
asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
+
          }
        } catch {
          case e: Exception =>
@@ -187,19 +188,18 @@ class CarbonScanRDD[K, V](
 
        override def hasNext: Boolean = {
          if (!finished && !havePair) {
-           finished = (null == rowIterator) || (!rowIterator.hasNext())
+           finished = (null == rowIterator) || (!rowIterator.hasNext)
            havePair = !finished
          }
          !finished
        }
 
-       override def next(): (K, V) = {
+       override def next(): V = {
          if (!hasNext) {
            throw new java.util.NoSuchElementException("End of stream")
          }
          havePair = false
-         val row = rowIterator.next()
-         keyClass.getKey(row, null)
+         keyClass.getValue(rowIterator.next())
        }
 
        logInfo("********************** Total Time Taken to execute the query 
in Carbon Side: " +

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/carbondata/spark/util/QueryPlanUtil.scala
 
b/integration/spark/src/main/scala/org/carbondata/spark/util/QueryPlanUtil.scala
index 81b6feb..321e0f8 100644
--- 
a/integration/spark/src/main/scala/org/carbondata/spark/util/QueryPlanUtil.scala
+++ 
b/integration/spark/src/main/scala/org/carbondata/spark/util/QueryPlanUtil.scala
@@ -25,7 +25,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 
 import org.carbondata.core.carbon.AbsoluteTableIdentifier
 import org.carbondata.hadoop.CarbonInputFormat
-import org.carbondata.query.carbon.result.RowResult
 
 /**
  * All the utility functions for carbon plan creation
@@ -36,8 +35,8 @@ object QueryPlanUtil {
    * createCarbonInputFormat from query model
    */
   def createCarbonInputFormat(absoluteTableIdentifier: 
AbsoluteTableIdentifier) :
-  (CarbonInputFormat[RowResult], Job) = {
-    val carbonInputFormat = new CarbonInputFormat[RowResult]()
+  (CarbonInputFormat[Array[Object]], Job) = {
+    val carbonInputFormat = new CarbonInputFormat[Array[Object]]()
     val jobConf: JobConf = new JobConf(new Configuration)
     val job: Job = new Job(jobConf)
     FileInputFormat.addInputPath(job, new 
Path(absoluteTableIdentifier.getStorePath))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
 
b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
index 4edfb7a..c2a2277 100644
--- 
a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
+++ 
b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
@@ -64,6 +64,7 @@ class AllDataTypesTestCaseAggregate extends QueryTest with 
BeforeAndAfterAll {
       sql("select channelsId, Latest_DAY from Carbon_automation_test where 
count(channelsId) = 1").collect
     } catch {
       case ce: UnsupportedOperationException => ce.getMessage
+      case ce: Exception => ce.getMessage
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/processing/src/test/java/org/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java
----------------------------------------------------------------------
diff --git 
a/processing/src/test/java/org/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java
 
b/processing/src/test/java/org/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java
index 18b2bfe..641d18a 100644
--- 
a/processing/src/test/java/org/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java
+++ 
b/processing/src/test/java/org/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java
@@ -84,7 +84,7 @@ public class ColGroupMinMaxTest {
            }
            setMinData(data[i]);
            setMaxData(data[i]);
-           System.out.println(Arrays.toString(data[i]));
+//         System.out.println(Arrays.toString(data[i]));
        }
        mdkeyData = new byte[1000][];
        for (int i = 0; i < 1000; i++) {

Reply via email to