http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 55fa2bd..37abd73 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -193,26 +193,13 @@ case class 
AlterTableCompactionPostStatusUpdateEvent(sparkSession: SparkSession,
 
 /**
  * Compaction Event for handling clean up in case of any compaction failure 
and abort the
- * operation, lister has to implement this event to handle failure scenarios
- *
- * @param carbonTable
- * @param carbonMergerMapping
- * @param mergedLoadName
- */
-case class AlterTableCompactionAbortEvent(sparkSession: SparkSession,
-    carbonTable: CarbonTable,
-    carbonMergerMapping: CarbonMergerMapping,
-    mergedLoadName: String) extends Event with AlterTableCompactionEventInfo
-
-
-/**
- * Compaction Event for handling exception in compaction
+ * * operation, lister has to implement this event to handle failure scenarios
  *
  * @param sparkSession
  * @param carbonTable
  * @param alterTableModel
  */
-case class AlterTableCompactionExceptionEvent(sparkSession: SparkSession,
+case class AlterTableCompactionAbortEvent(sparkSession: SparkSession,
     carbonTable: CarbonTable,
     alterTableModel: AlterTableModel) extends Event with 
AlterTableCompactionEventInfo
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
index 1a9c5f6..b7e9c20 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
@@ -36,11 +36,3 @@ case class CleanFilesPreEvent(carbonTable: CarbonTable, 
sparkSession: SparkSessi
  */
 case class CleanFilesPostEvent(carbonTable: CarbonTable, sparkSession: 
SparkSession)
   extends Event with CleanFilesEventInfo
-
-/**
- *
- * @param carbonTable
- * @param sparkSession
- */
-case class CleanFilesAbortEvent(carbonTable: CarbonTable, sparkSession: 
SparkSession)
-  extends Event with CleanFilesEventInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 5f23f77..9724fa8 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -31,11 +31,6 @@ class CarbonOption(options: Map[String, String]) {
 
   def partitionCount: String = options.getOrElse("partitionCount", "1")
 
-  def partitionClass: String = {
-    options.getOrElse("partitionClass",
-      
"org.apache.carbondata.processing.partition.impl.SampleDataPartitionerImpl")
-  }
-
   def tempCSV: Boolean = options.getOrElse("tempCSV", "false").toBoolean
 
   def compress: Boolean = options.getOrElse("compress", "false").toBoolean

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index f443214..73ed769 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -254,8 +254,7 @@ object DataLoadProcessorStepOnSpark {
       while (rows.hasNext) {
         if (rowsNotExist) {
           rowsNotExist = false
-          dataHandler = 
CarbonFactHandlerFactory.createCarbonFactHandler(dataHandlerModel,
-            CarbonFactHandlerFactory.FactHandlerType.COLUMNAR)
+          dataHandler = 
CarbonFactHandlerFactory.createCarbonFactHandler(dataHandlerModel)
           dataHandler.initialise()
         }
         val row = dataWriter.processRow(rows.next(), dataHandler)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index cf22b3d..2ec8b9c 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -68,9 +68,6 @@ trait GenericParser {
   def parseString(input: String): Unit
 }
 
-case class DictionaryStats(distinctValues: java.util.List[String],
-    dictWriteTime: Long, sortIndexWriteTime: Long)
-
 case class PrimitiveParser(dimension: CarbonDimension,
     setOpt: Option[mutable.HashSet[String]]) extends GenericParser {
   val (hasDictEncoding, set: mutable.HashSet[String]) = setOpt match {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 401ba29..df3072b 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -47,15 +47,13 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.UpdateVO
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.{FileFormat, 
SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
DataTypeUtil}
-import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, 
CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, 
CarbonInputSplitTaskInfo}
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger._
-import org.apache.carbondata.processing.splits.TableSplit
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
CarbonLoaderUtil}
 import org.apache.carbondata.spark.MergeResult
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, 
SparkDataTypeConverterImpl, Util}
@@ -517,16 +515,3 @@ class CarbonMergerRDD[K, V](
     theSplit.split.value.getLocations.filter(_ != "localhost")
   }
 }
-
-
-class CarbonLoadPartition(rddId: Int, val idx: Int, @transient val tableSplit: 
TableSplit)
-  extends Partition {
-
-  override val index: Int = idx
-  val serializableHadoopSplit = new 
SerializableWritable[TableSplit](tableSplit)
-
-  override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
-case class SplitTaskInfo (splits: List[CarbonInputSplit]) extends 
CarbonInputSplit{
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 80b2d12..6b136bc 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -50,7 +50,6 @@ import 
org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses
 import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, 
CSVInputFormat, CSVRecordReaderIterator}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.splits.TableSplit
 import org.apache.carbondata.processing.util.CarbonQueryUtil
 import org.apache.carbondata.spark.DataLoadResult
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
@@ -103,21 +102,6 @@ class CarbonNodePartition(rddId: Int, val idx: Int, host: 
String,
   override def hashCode(): Int = 41 * (41 + rddId) + idx
 }
 
-/**
- * This partition class use to split by TableSplit
- *
- */
-class CarbonTableSplitPartition(rddId: Int, val idx: Int, @transient val 
tableSplit: TableSplit,
-    val blocksDetails: Array[BlockDetails])
-  extends Partition {
-
-  override val index: Int = idx
-  val serializableHadoopSplit = new 
SerializableWritable[TableSplit](tableSplit)
-  val partitionBlocksDetail = blocksDetails
-
-  override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
 class SparkPartitionLoader(model: CarbonLoadModel,
     splitIndex: Long,
     storePath: String,
@@ -140,7 +124,6 @@ class SparkPartitionLoader(model: CarbonLoadModel,
     CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true")
     CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", 
"true")
     CarbonProperties.getInstance().addProperty("is.compressed.keyblock", 
"false")
-    CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", 
"120000")
 
     // this property is used to determine whether temp location for carbon is 
inside
     // container temp dir or is yarn application directory.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 30c1874..28cd7ef 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -88,23 +88,6 @@ object CarbonScalaUtil {
     }
   }
 
-  def convertSparkToCarbonSchemaDataType(dataType: String): String = {
-    dataType match {
-      case CarbonCommonConstants.STRING_TYPE => CarbonCommonConstants.STRING
-      case CarbonCommonConstants.INTEGER_TYPE => CarbonCommonConstants.INTEGER
-      case CarbonCommonConstants.BYTE_TYPE => CarbonCommonConstants.INTEGER
-      case CarbonCommonConstants.SHORT_TYPE => CarbonCommonConstants.SHORT
-      case CarbonCommonConstants.LONG_TYPE => CarbonCommonConstants.NUMERIC
-      case CarbonCommonConstants.DOUBLE_TYPE => CarbonCommonConstants.NUMERIC
-      case CarbonCommonConstants.FLOAT_TYPE => CarbonCommonConstants.NUMERIC
-      case CarbonCommonConstants.DECIMAL_TYPE => CarbonCommonConstants.NUMERIC
-      case CarbonCommonConstants.DATE_TYPE => CarbonCommonConstants.STRING
-      case CarbonCommonConstants.BOOLEAN_TYPE => CarbonCommonConstants.STRING
-      case CarbonCommonConstants.TIMESTAMP_TYPE => 
CarbonCommonConstants.TIMESTAMP
-      case anyType => anyType
-    }
-  }
-
   def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = 
{
     if (CarbonDataTypes.isDecimal(dataType)) {
       DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
@@ -361,77 +344,6 @@ object CarbonScalaUtil {
   }
 
   /**
-   * This method will validate a column for its data type and check whether 
the column data type
-   * can be modified and update if conditions are met
-   *
-   * @param dataTypeInfo
-   * @param carbonColumn
-   */
-  def validateColumnDataType(dataTypeInfo: DataTypeInfo, carbonColumn: 
CarbonColumn): Unit = {
-    carbonColumn.getDataType.getName match {
-      case "INT" =>
-        if (!dataTypeInfo.dataType.equals("bigint") && 
!dataTypeInfo.dataType.equals("long")) {
-          sys
-            .error(s"Given column ${ carbonColumn.getColName } with data type 
${
-              carbonColumn
-                .getDataType.getName
-            } cannot be modified. Int can only be changed to bigInt or long")
-        }
-      case "DECIMAL" =>
-        if (!dataTypeInfo.dataType.equals("decimal")) {
-          sys
-            .error(s"Given column ${ carbonColumn.getColName } with data type 
${
-              carbonColumn.getDataType.getName
-            } cannot be modified. Decimal can be only be changed to Decimal of 
higher precision")
-        }
-        if (dataTypeInfo.precision <= 
carbonColumn.getColumnSchema.getPrecision) {
-          sys
-            .error(s"Given column ${
-              carbonColumn
-                .getColName
-            } cannot be modified. Specified precision value ${
-              dataTypeInfo
-                .precision
-            } should be greater than current precision value ${
-              carbonColumn.getColumnSchema
-                .getPrecision
-            }")
-        } else if (dataTypeInfo.scale < carbonColumn.getColumnSchema.getScale) 
{
-          sys
-            .error(s"Given column ${
-              carbonColumn
-                .getColName
-            } cannot be modified. Specified scale value ${
-              dataTypeInfo
-                .scale
-            } should be greater or equal to current scale value ${
-              carbonColumn.getColumnSchema
-                .getScale
-            }")
-        } else {
-          // difference of precision and scale specified by user should not be 
less than the
-          // difference of already existing precision and scale else it will 
result in data loss
-          val carbonColumnPrecisionScaleDiff = 
carbonColumn.getColumnSchema.getPrecision -
-                                               
carbonColumn.getColumnSchema.getScale
-          val dataInfoPrecisionScaleDiff = dataTypeInfo.precision - 
dataTypeInfo.scale
-          if (dataInfoPrecisionScaleDiff < carbonColumnPrecisionScaleDiff) {
-            sys
-              .error(s"Given column ${
-                carbonColumn
-                  .getColName
-              } cannot be modified. Specified precision and scale values will 
lead to data loss")
-          }
-        }
-      case _ =>
-        sys
-          .error(s"Given column ${ carbonColumn.getColName } with data type ${
-            carbonColumn
-              .getDataType.getName
-          } cannot be modified. Only Int and Decimal data types are allowed 
for modification")
-    }
-  }
-
-  /**
    * returns  all fields except tupleId field as it is not required in the 
value
    *
    * @param fields

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 2f08d07..39530f4 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -65,77 +65,6 @@ object CommonUtil {
   val FIXED_DECIMAL = """decimal\(\s*(\d+)\s*,\s*(\-?\d+)\s*\)""".r
   val FIXED_DECIMALTYPE = """decimaltype\(\s*(\d+)\s*,\s*(\-?\d+)\s*\)""".r
 
-  def validateColumnGroup(colGroup: String, noDictionaryDims: Seq[String],
-      msrs: Seq[Field], retrievedColGrps: Seq[String], dims: Seq[Field]) {
-    val colGrpCols = colGroup.split(',').map(_.trim)
-    colGrpCols.foreach { x =>
-      // if column is no dictionary
-      if (noDictionaryDims.contains(x)) {
-        throw new MalformedCarbonCommandException(
-          "Column group is not supported for no dictionary columns:" + x)
-      } else if (msrs.exists(msr => msr.column.equals(x))) {
-        // if column is measure
-        throw new MalformedCarbonCommandException("Column group is not 
supported for measures:" + x)
-      } else if (foundIndExistingColGrp(x)) {
-        throw new MalformedCarbonCommandException("Column is available in 
other column group:" + x)
-      } else if (isComplex(x, dims)) {
-        throw new MalformedCarbonCommandException(
-          "Column group doesn't support Complex column:" + x)
-      } else if (isTimeStampColumn(x, dims)) {
-        throw new MalformedCarbonCommandException(
-          "Column group doesn't support Timestamp datatype:" + x)
-      }// if invalid column is
-      else if (!dims.exists(dim => dim.column.equalsIgnoreCase(x))) {
-        // present
-        throw new MalformedCarbonCommandException(
-          "column in column group is not a valid column: " + x
-        )
-      }
-    }
-    // check if given column is present in other groups
-    def foundIndExistingColGrp(colName: String): Boolean = {
-      retrievedColGrps.foreach { colGrp =>
-        if (colGrp.split(",").contains(colName)) {
-          return true
-        }
-      }
-      false
-    }
-
-  }
-
-
-  def isTimeStampColumn(colName: String, dims: Seq[Field]): Boolean = {
-    dims.foreach { dim =>
-      if (dim.column.equalsIgnoreCase(colName)) {
-        if (dim.dataType.isDefined && null != dim.dataType.get &&
-            "timestamp".equalsIgnoreCase(dim.dataType.get)) {
-          return true
-        }
-      }
-    }
-    false
-  }
-
-  def isComplex(colName: String, dims: Seq[Field]): Boolean = {
-    dims.foreach { x =>
-      if (x.children.isDefined && null != x.children.get && 
x.children.get.nonEmpty) {
-        val children = x.children.get
-        if (x.column.equals(colName)) {
-          return true
-        } else {
-          children.foreach { child =>
-            val fieldName = x.column + "." + child.column
-            if (fieldName.equalsIgnoreCase(colName)) {
-              return true
-            }
-          }
-        }
-      }
-    }
-    false
-  }
-
   def getColumnProperties(column: String,
       tableProperties: Map[String, String]): Option[util.List[ColumnProperty]] 
= {
     val fieldProps = new util.ArrayList[ColumnProperty]()
@@ -438,38 +367,6 @@ object CommonUtil {
   }
 
   /**
-   * @param colGrps
-   * @param dims
-   * @return columns of column groups in schema order
-   */
-  def arrangeColGrpsInSchemaOrder(colGrps: Seq[String], dims: Seq[Field]): 
Seq[String] = {
-    def sortByIndex(colGrp1: String, colGrp2: String) = {
-      val firstCol1 = colGrp1.split(",")(0)
-      val firstCol2 = colGrp2.split(",")(0)
-      val dimIndex1: Int = getDimIndex(firstCol1, dims)
-      val dimIndex2: Int = getDimIndex(firstCol2, dims)
-      dimIndex1 < dimIndex2
-    }
-    val sortedColGroups: Seq[String] = colGrps.sortWith(sortByIndex)
-    sortedColGroups
-  }
-
-  /**
-   * @param colName
-   * @param dims
-   * @return return index for given column in dims
-   */
-  def getDimIndex(colName: String, dims: Seq[Field]): Int = {
-    var index: Int = -1
-    dims.zipWithIndex.foreach { h =>
-      if (h._1.column.equalsIgnoreCase(colName)) {
-        index = h._2.toInt
-      }
-    }
-    index
-  }
-
-  /**
    * validate table level properties for compaction
    *
    * @param tableProperties
@@ -692,58 +589,6 @@ object CommonUtil {
     }
   }
 
-  def validateMaxColumns(csvHeaders: Array[String], maxColumns: String): Int = 
{
-    /*
-    User configures both csvheadercolumns, maxcolumns,
-      if csvheadercolumns >= maxcolumns, give error
-      if maxcolumns > threashold, give error
-    User configures csvheadercolumns
-      if csvheadercolumns >= maxcolumns(default) then maxcolumns = 
csvheadercolumns+1
-      if csvheadercolumns >= threashold, give error
-    User configures nothing
-      if csvheadercolumns >= maxcolumns(default) then maxcolumns = 
csvheadercolumns+1
-      if csvheadercolumns >= threashold, give error
-     */
-    val columnCountInSchema = csvHeaders.length
-    var maxNumberOfColumnsForParsing = 0
-    val maxColumnsInt = getMaxColumnValue(maxColumns)
-    if (maxColumnsInt != null) {
-      if (columnCountInSchema >= maxColumnsInt) {
-        CarbonException.analysisException(
-          s"csv headers should be less than the max columns: $maxColumnsInt")
-      } else if (maxColumnsInt > 
CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
-        CarbonException.analysisException(
-          s"max columns cannot be greater than the threshold value: " +
-            s"${CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING}")
-      } else {
-        maxNumberOfColumnsForParsing = maxColumnsInt
-      }
-    } else if (columnCountInSchema >= 
CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
-      CarbonException.analysisException(
-        s"csv header columns should be less than max threashold: " +
-          s"${CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING}")
-    } else if (columnCountInSchema >= 
CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
-      maxNumberOfColumnsForParsing = columnCountInSchema + 1
-    } else {
-      maxNumberOfColumnsForParsing = 
CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING
-    }
-    maxNumberOfColumnsForParsing
-  }
-
-  private def getMaxColumnValue(maxColumn: String): Integer = {
-    if (maxColumn != null) {
-      try {
-        maxColumn.toInt
-      } catch {
-        case e: Exception =>
-          LOGGER.error(s"Invalid value for max column in load options ${ 
e.getMessage }")
-          null
-      }
-    } else {
-      null
-    }
-  }
-
   def getPartitionInfo(columnName: String, partitionType: PartitionType,
       partitionInfo: PartitionInfo): Seq[Row] = {
     var result = Seq.newBuilder[Row]

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
index 6cd28c0..c8af869 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
@@ -82,28 +82,6 @@ object DataTypeConverterUtil {
     }
   }
 
-  def convertToString(dataType: DataType): String = {
-    if (DataTypes.isDecimal(dataType)) {
-      "decimal"
-    } else if (DataTypes.isArrayType(dataType)) {
-      "array"
-    } else if (DataTypes.isStructType(dataType)) {
-      "struct"
-    } else {
-      dataType match {
-        case DataTypes.BOOLEAN => "boolean"
-      case DataTypes.STRING => "string"
-        case DataTypes.SHORT => "smallint"
-        case DataTypes.INT => "int"
-        case DataTypes.LONG => "bigint"
-        case DataTypes.DOUBLE => "double"
-        case DataTypes.FLOAT => "double"
-        case DataTypes.TIMESTAMP => "timestamp"
-        case DataTypes.DATE => "date"
-      }
-    }
-  }
-
   /**
    * convert from wrapper to external data type
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 5129e59..1bb3912 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -159,78 +159,6 @@ object GlobalDictionaryUtil {
     dimensionsWithDict.toArray
   }
 
-  /**
-   * invoke CarbonDictionaryWriter to write dictionary to file.
-   *
-   * @param model       instance of DictionaryLoadModel
-   * @param columnIndex the index of current column in column list
-   * @param iter        distinct value list of dictionary
-   */
-  def writeGlobalDictionaryToFile(model: DictionaryLoadModel,
-      columnIndex: Int,
-      iter: Iterator[String]): Unit = {
-    val dictService = CarbonCommonFactory.getDictionaryService
-    val dictionaryColumnUniqueIdentifier: DictionaryColumnUniqueIdentifier = 
new
-        DictionaryColumnUniqueIdentifier(
-      model.table,
-      model.columnIdentifier(columnIndex),
-      model.columnIdentifier(columnIndex).getDataType)
-    val writer: CarbonDictionaryWriter = dictService
-      .getDictionaryWriter(dictionaryColumnUniqueIdentifier)
-    try {
-      while (iter.hasNext) {
-        writer.write(iter.next)
-      }
-    } finally {
-      writer.close()
-    }
-  }
-
-  /**
-   * read global dictionary from cache
-   */
-  def readGlobalDictionaryFromCache(model: DictionaryLoadModel): 
HashMap[String, Dictionary] = {
-    val dictMap = new HashMap[String, Dictionary]
-    model.primDimensions.zipWithIndex.filter(f => 
model.dictFileExists(f._2)).foreach { m =>
-      val dict = CarbonLoaderUtil.getDictionary(model.table,
-        m._1.getColumnIdentifier, m._1.getDataType
-      )
-      dictMap.put(m._1.getColumnId, dict)
-    }
-    dictMap
-  }
-
-  /**
-   * invoke CarbonDictionaryReader to read dictionary from files.
-   *
-   * @param model carbon dictionary load model
-   */
-  def readGlobalDictionaryFromFile(model: DictionaryLoadModel): 
HashMap[String, HashSet[String]] = {
-    val dictMap = new HashMap[String, HashSet[String]]
-    val dictService = CarbonCommonFactory.getDictionaryService
-    for (i <- model.primDimensions.indices) {
-      val dictionaryColumnUniqueIdentifier: DictionaryColumnUniqueIdentifier = 
new
-          DictionaryColumnUniqueIdentifier(
-            model.table,
-            model.columnIdentifier(i),
-            model.columnIdentifier(i).getDataType)
-      val set = new HashSet[String]
-      if (model.dictFileExists(i)) {
-        val reader: CarbonDictionaryReader = dictService.getDictionaryReader(
-          dictionaryColumnUniqueIdentifier)
-        val values = reader.read
-        if (values != null) {
-          for (j <- 0 until values.size) {
-            set.add(new String(values.get(j),
-              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)))
-          }
-        }
-      }
-      dictMap.put(model.primDimensions(i).getColumnId, set)
-    }
-    dictMap
-  }
-
   def generateParserForChildrenDimension(dim: CarbonDimension,
       format: DataFormat,
       mapColumnValuesWithId:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 840b03f..cc8a28e 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -100,13 +100,8 @@ object StreamSinkFactory {
     val operationContext = new OperationContext
     val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
       carbonTable.getCarbonTableIdentifier,
-      carbonLoadModel,
-      carbonLoadModel.getFactFilePath,
-      false,
-      parameters.asJava,
-      parameters.asJava,
-      false
-    )
+      carbonLoadModel
+      )
     OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, 
operationContext)
     // prepare the stream segment
     val segmentId = getStreamSegmentId(carbonTable)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala
deleted file mode 100644
index bcca7ed..0000000
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import scala.reflect.ClassTag
-
-import org.apache.spark._
-
-import org.apache.carbondata.spark.rdd.CarbonRDD
-
-
-// This RDD distributes previous RDD data based on number of nodes. i.e., one 
partition for one node
-
-class UpdateCoalescedRDD[T: ClassTag](
-    @transient var prev: RDD[T],
-    nodeList: Array[String])
-  extends CarbonRDD[T](prev.context, Nil, 
prev.sparkContext.hadoopConfiguration) {
-
-  override def getPartitions: Array[Partition] = {
-    new DataLoadPartitionCoalescer(prev, nodeList).run
-  }
-
-  override def internalCompute(split: Partition,
-      context: TaskContext): Iterator[T] = {
-    // This iterator combines data from all the parent partitions
-    new Iterator[T] {
-      val parentPartitionIter = 
split.asInstanceOf[CoalescedRDDPartition].parents.iterator
-      var currentDataIter: Iterator[T] = _
-      val prevRdd = firstParent[T]
-
-      def hasNext: Boolean = {
-        while (currentDataIter == null ||
-               !currentDataIter.hasNext &&
-               parentPartitionIter.hasNext) {
-          val currentPartition = parentPartitionIter.next()
-          currentDataIter = prevRdd.compute(currentPartition, context)
-        }
-        if (currentDataIter == null) {
-          false
-        } else {
-          currentDataIter.hasNext
-        }
-      }
-
-      def next: T = {
-        currentDataIter.next()
-      }
-    }
-  }
-
-  override def getDependencies: Seq[Dependency[_]] = {
-    Seq(new NarrowDependency(prev) {
-      def getParents(id: Int): Seq[Int] = {
-        partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices
-      }
-    })
-  }
-
-  override def clearDependencies() {
-    super.clearDependencies()
-    prev = null
-  }
-
-  /**
-   * Returns the preferred machine for the partition. If split is of type 
CoalescedRDDPartition,
-   * then the preferred machine will be one which most parent splits prefer 
too.
-   *
-   * @param partition
-   * @return the machine most preferred by split
-   */
-  override def getPreferredLocations(partition: Partition): Seq[String] = {
-    partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index d705fc9..0a0b49f 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -289,13 +289,6 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
 
     // column properties
     val colProps = extractColumnProperties(fields, tableProperties)
-    // get column groups configuration from table properties.
-    val groupCols: Seq[String] = updateColumnGroupsInField(tableProperties,
-      noDictionaryDims, msrs, dims)
-    if (groupCols != null) {
-      throw new MalformedCarbonCommandException(
-        s"${CarbonCommonConstants.COLUMN_GROUPS} is deprecated")
-    }
 
     // validate the local dictionary property if defined
     if 
(tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).isDefined) {
@@ -409,7 +402,6 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
       Option(varcharColumns),
       Option(noDictionaryDims),
       Option(noInvertedIdxCols),
-      groupCols,
       Some(colProps),
       bucketFields: Option[BucketFields],
       partitionInfo,
@@ -488,74 +480,6 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
   }
 
   /**
-   * Extract the column groups configuration from table properties.
-   * Based on this Row groups of fields will be determined.
-   *
-   * @param tableProperties
-   * @return
-   */
-  protected def updateColumnGroupsInField(tableProperties: mutable.Map[String, 
String],
-      noDictionaryDims: Seq[String],
-      msrs: Seq[Field],
-      dims: Seq[Field]): Seq[String] = {
-    if (tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).isDefined) {
-
-      var splittedColGrps: Seq[String] = Seq[String]()
-      val nonSplitCols: String = 
tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get
-
-      // row groups will be specified in table properties like -> 
"(col1,col2),(col3,col4)"
-      // here first splitting the value by () . so that the above will be 
splitted into 2 strings.
-      // [col1,col2] [col3,col4]
-      val m: Matcher = Pattern.compile("\\(([^)]+)\\)").matcher(nonSplitCols)
-      while (m.find()) {
-        val oneGroup: String = m.group(1)
-        CommonUtil.validateColumnGroup(oneGroup, noDictionaryDims, msrs, 
splittedColGrps, dims)
-        val arrangedColGrp = rearrangedColumnGroup(oneGroup, dims)
-        splittedColGrps :+= arrangedColGrp
-      }
-      // This will  be furthur handled.
-      CommonUtil.arrangeColGrpsInSchemaOrder(splittedColGrps, dims)
-    } else {
-      null
-    }
-  }
-
-  def rearrangedColumnGroup(colGroup: String, dims: Seq[Field]): String = {
-    // if columns in column group is not in schema order than arrange it in 
schema order
-    var colGrpFieldIndx: Seq[Int] = Seq[Int]()
-    colGroup.split(',').map(_.trim).foreach { x =>
-      dims.zipWithIndex.foreach { dim =>
-        if (dim._1.column.equalsIgnoreCase(x)) {
-          colGrpFieldIndx :+= dim._2
-        }
-      }
-    }
-    // sort it
-    colGrpFieldIndx = colGrpFieldIndx.sorted
-    // check if columns in column group is in schema order
-    if (!checkIfInSequence(colGrpFieldIndx)) {
-      throw new MalformedCarbonCommandException("Invalid column group:" + 
colGroup)
-    }
-    def checkIfInSequence(colGrpFieldIndx: Seq[Int]): Boolean = {
-      for (i <- 0 until (colGrpFieldIndx.length - 1)) {
-        if ((colGrpFieldIndx(i + 1) - colGrpFieldIndx(i)) != 1) {
-          throw new MalformedCarbonCommandException(
-            "Invalid column group,column in group should be contiguous as per 
schema.")
-        }
-      }
-      true
-    }
-    val colGrpNames: StringBuilder = StringBuilder.newBuilder
-    for (i <- colGrpFieldIndx.indices) {
-      colGrpNames.append(dims(colGrpFieldIndx(i)).column)
-      if (i < (colGrpFieldIndx.length - 1)) {
-        colGrpNames.append(",")
-      }
-    }
-    colGrpNames.toString()
-  }
-
-  /**
    * @param partitionCols
    * @param tableProperties
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index e50a8fd..a61f94f 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -63,7 +63,6 @@ case class TableModel(
     varcharCols: Option[Seq[String]],
     highcardinalitydims: Option[Seq[String]],
     noInvertedIdxCols: Option[Seq[String]],
-    columnGroups: Seq[String],
     colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None,
     bucketFields: Option[BucketFields],
     partitionInfo: Option[PartitionInfo],
@@ -98,9 +97,6 @@ case class ColumnProperty(key: String, value: String)
 case class ComplexField(complexType: String, primitiveField: Option[Field],
     complexField: Option[ComplexField])
 
-case class Partitioner(partitionClass: String, partitionColumn: Array[String], 
partitionCount: Int,
-    nodeList: Array[String])
-
 case class PartitionerField(partitionColumn: String, dataType: Option[String],
     columnComment: String)
 
@@ -512,7 +508,6 @@ object TableNewProcessor {
     val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema)
     columnSchema.setColumnUniqueId(columnUniqueId)
     columnSchema.setColumnReferenceId(columnUniqueId)
-    columnSchema.setColumnar(true)
     columnSchema.setDimensionColumn(isDimensionCol)
     columnSchema.setPrecision(precision)
     columnSchema.setScale(scale)
@@ -761,10 +756,6 @@ class TableNewProcessor(cm: TableModel) {
 
     val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
 
-    checkColGroupsValidity(cm.columnGroups, allColumns, highCardinalityDims)
-
-    updateColumnGroupsInFields(cm.columnGroups, allColumns)
-
     // Setting the boolean value of useInvertedIndex in column schema, if 
Paranet table is defined
     // Encoding is already decided above
     if (!cm.parentTable.isDefined) {
@@ -925,25 +916,4 @@ class TableNewProcessor(cm: TableModel) {
       })
     }
   }
-
-  // For updating the col group details for fields.
-  private def updateColumnGroupsInFields(colGrps: Seq[String], allCols: 
Seq[ColumnSchema]): Unit = {
-    if (null != colGrps) {
-      var colGroupId = -1
-      colGrps.foreach(columngroup => {
-        colGroupId += 1
-        val rowCols = columngroup.split(",")
-        rowCols.foreach(row => {
-
-          allCols.foreach(eachCol => {
-
-            if (eachCol.getColumnName.equalsIgnoreCase(row.trim)) {
-              eachCol.setColumnGroup(colGroupId)
-              eachCol.setColumnar(false)
-            }
-          })
-        })
-      })
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 44f96bd..ffaac86 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -113,12 +113,7 @@ class CarbonAppendableStreamSink(
       // in case of streaming options and optionsFinal can be same
       val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
         carbonTable.getCarbonTableIdentifier,
-        carbonLoadModel,
-        carbonLoadModel.getFactFilePath,
-        false,
-        parameters.asJava,
-        parameters.asJava,
-        false)
+        carbonLoadModel)
       OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, 
operationContext)
       checkOrHandOffSegment()
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 9b9e71d..24af8ec 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -31,6 +31,7 @@
 
   <properties>
     <dev.path>${basedir}/../../dev</dev.path>
+    <jacoco.append>true</jacoco.append>
   </properties>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
index 0642e01..cb5a1b1 100644
--- 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
@@ -24,7 +24,6 @@ import java.util.List;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.exceptions.MetadataProcessException;
 import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
-import org.apache.carbondata.core.datamap.DataMapCatalog;
 import org.apache.carbondata.core.datamap.DataMapProvider;
 import org.apache.carbondata.core.datamap.DataMapRegistry;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
@@ -81,11 +80,6 @@ public class IndexDataMapProvider extends DataMapProvider {
   }
 
   @Override
-  public void initData() {
-    // Nothing is needed to do by default
-  }
-
-  @Override
   public void cleanMeta() throws IOException {
     if (getMainTable() == null) {
       throw new UnsupportedOperationException("Table need to be specified in 
index datamaps");
@@ -108,11 +102,6 @@ public class IndexDataMapProvider extends DataMapProvider {
     IndexDataMapRebuildRDD.rebuildDataMap(sparkSession, getMainTable(), 
getDataMapSchema());
   }
 
-  @Override
-  public void incrementalBuild(String[] segmentIds) {
-    throw new UnsupportedOperationException();
-  }
-
   private DataMapFactory<? extends DataMap> createDataMapFactory()
       throws MalformedDataMapCommandException {
     CarbonTable mainTable = getMainTable();
@@ -135,12 +124,6 @@ public class IndexDataMapProvider extends DataMapProvider {
   }
 
   @Override
-  public DataMapCatalog createDataMapCatalog() {
-    // TODO create abstract class and move the default implementation there.
-    return null;
-  }
-
-  @Override
   public DataMapFactory getDataMapFactory() {
     return dataMapFactory;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
index 8226f22..a1f80b1 100644
--- 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.DataMapCatalog;
 import org.apache.carbondata.core.datamap.DataMapProvider;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProperty;
@@ -74,11 +73,6 @@ public class PreAggregateDataMapProvider extends 
DataMapProvider {
   }
 
   @Override
-  public void initData() {
-    // Nothing is needed to do by default
-  }
-
-  @Override
   public void cleanMeta() {
     DataMapSchema dataMapSchema = getDataMapSchema();
     dropTableCommand = new CarbonDropTableCommand(
@@ -103,15 +97,6 @@ public class PreAggregateDataMapProvider extends 
DataMapProvider {
     }
   }
 
-  @Override public void incrementalBuild(String[] segmentIds) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override public DataMapCatalog createDataMapCatalog() {
-    // TODO manage pre-agg also with catalog.
-    return null;
-  }
-
   @Override
   public DataMapFactory getDataMapFactory() {
     throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 9fdeb9d..d467017 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -109,8 +109,8 @@ case class CarbonAlterTableCompactionCommand(
       compactionType = 
CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
     } catch {
       case _: Exception =>
-        val alterTableCompactionExceptionEvent: 
AlterTableCompactionExceptionEvent =
-          AlterTableCompactionExceptionEvent(sparkSession, table, 
alterTableModel)
+        val alterTableCompactionExceptionEvent: AlterTableCompactionAbortEvent 
=
+          AlterTableCompactionAbortEvent(sparkSession, table, alterTableModel)
         OperationListenerBus.getInstance
           .fireEvent(alterTableCompactionExceptionEvent, operationContext)
         compactionException = 
operationContext.getProperty("compactionException").toString

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 6b1865a..86a2bc1 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -236,12 +236,7 @@ case class CarbonLoadDataCommand(
         val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
           new LoadTablePreExecutionEvent(
             table.getCarbonTableIdentifier,
-            carbonLoadModel,
-            factPath,
-            dataFrame.isDefined,
-            optionsFinal,
-            options.asJava,
-            isOverwriteTable)
+            carbonLoadModel)
         operationContext.setProperty("isOverwrite", isOverwriteTable)
         OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, 
operationContext)
         // Add pre event listener for index datamap

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index e80579d..97eccc6 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -122,51 +122,6 @@ object TimeSeriesUtil {
   }
 
   /**
-   * Below method will be used to validate the hierarchy of time series and 
its value
-   * validation will be done whether hierarchy order is proper or not and 
hierarchy level
-   * value
-   * TODO: we should remove this method
-   *
-   * @param timeSeriesHierarchyDetails
-   * time series hierarchy string
-   */
-  @deprecated
-  def validateAndGetTimeSeriesHierarchyDetails(timeSeriesHierarchyDetails: 
String): Array[
-    (String, String)] = {
-    val updatedtimeSeriesHierarchyDetails = 
timeSeriesHierarchyDetails.toLowerCase
-    val timeSeriesHierarchy = updatedtimeSeriesHierarchyDetails.split(",")
-    val hierBuffer = timeSeriesHierarchy.map {
-      case f =>
-        val splits = f.split("=")
-        // checking hierarchy name is valid or not
-        if 
(!TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION.contains(splits(0).toLowerCase)) {
-          throw new MalformedCarbonCommandException(s"Not supported hierarchy 
type: ${ splits(0) }")
-        }
-        // validating hierarchy level is valid or not
-        if (!splits(1).equals("1")) {
-          throw new MalformedCarbonCommandException(
-            s"Unsupported Value for hierarchy:" +
-            s"${ splits(0) }=${ splits(1) }")
-        }
-        (splits(0), splits(1))
-    }
-    // checking whether hierarchy is in proper order or not
-    // get the index of first hierarchy
-    val indexOfFirstHierarchy = TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION
-      .indexOf(hierBuffer(0)._1.toLowerCase)
-    // now iterating through complete hierarchy to check any of the hierarchy 
index
-    // is less than first one
-    for (index <- 1 to hierBuffer.size - 1) {
-      val currentIndex = TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION
-        .indexOf(hierBuffer(index)._1.toLowerCase)
-      if (currentIndex < indexOfFirstHierarchy) {
-        throw new 
MalformedCarbonCommandException(s"$timeSeriesHierarchyDetails is in wrong 
order")
-      }
-    }
-    hierBuffer
-  }
-
-  /**
    * Below method will be used to validate whether timeseries column present in
    * select statement or not
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 222d18d..8eb47fc 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -541,7 +541,6 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
           // default value should not be converted to lower case
           val tblProps = tblProp.get
             .map(f => if 
(CarbonCommonConstants.TABLE_BLOCKSIZE.equalsIgnoreCase(f._1) ||
-                          
CarbonCommonConstants.COLUMN_GROUPS.equalsIgnoreCase(f._1) ||
                           
CarbonCommonConstants.SORT_COLUMNS.equalsIgnoreCase(f._1) ||
                           
CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE.equalsIgnoreCase(f._1) ||
                           
CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD.equalsIgnoreCase(f._1)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1413fd1..ec68c1d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -401,6 +401,8 @@
                 <exclude>**/*SparkUnknownExpression*.class</exclude>
                 
<exclude>**/org/apache/carbondata/cluster/sdv/generated/*</exclude>
                 
<exclude>**/org.apache.carbondata.cluster.sdv.generated.*</exclude>
+                <exclude>**/org.apache.spark.sql.test.*</exclude>
+                <exclude>**/org.apache.carbondata.format.*</exclude>
               </excludes>
               <includes>
                 <include>**/org.apache.*</include>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/pom.xml
----------------------------------------------------------------------
diff --git a/processing/pom.xml b/processing/pom.xml
index ab7c96c..d3cf77a 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -31,6 +31,7 @@
 
   <properties>
     <dev.path>${basedir}/../dev</dev.path>
+    <jacoco.append>true</jacoco.append>
   </properties>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java
 
b/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java
index 15ff95e..03c6c97 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java
@@ -20,31 +20,7 @@ package org.apache.carbondata.processing.exception;
 public class DataLoadingException extends Exception {
   private static final long serialVersionUID = 1L;
 
-  private long errorCode = -1;
-
-  public DataLoadingException() {
-    super();
-  }
-
-  public DataLoadingException(long errorCode, String message) {
-    super(message);
-    this.errorCode = errorCode;
-  }
-
   public DataLoadingException(String message) {
     super(message);
   }
-
-  public DataLoadingException(Throwable cause) {
-    super(cause);
-  }
-
-  public DataLoadingException(String message, Throwable cause) {
-    super(message, cause);
-  }
-
-  public long getErrorCode() {
-    return errorCode;
-  }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java
 
b/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java
index 6fa5e2c..b59732c 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java
@@ -20,31 +20,7 @@ package org.apache.carbondata.processing.exception;
 public class MultipleMatchingException extends Exception {
   private static final long serialVersionUID = 1L;
 
-  private long errorCode = -1;
-
-  public MultipleMatchingException() {
-    super();
-  }
-
-  public MultipleMatchingException(long errorCode, String message) {
-    super(message);
-    this.errorCode = errorCode;
-  }
-
   public MultipleMatchingException(String message) {
     super(message);
   }
-
-  public MultipleMatchingException(Throwable cause) {
-    super(cause);
-  }
-
-  public MultipleMatchingException(String message, Throwable cause) {
-    super(message, cause);
-  }
-
-  public long getErrorCode() {
-    return errorCode;
-  }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java
 
b/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java
index d9640a9..a6f70c3 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.processing.exception;
 
-import java.util.Locale;
-
 public class SliceMergerException extends Exception {
 
   /**
@@ -36,33 +34,12 @@ public class SliceMergerException extends Exception {
    *
    * @param msg The error message for this exception.
    */
-  public SliceMergerException(String msg) {
-    super(msg);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
   public SliceMergerException(String msg, Throwable t) {
     super(msg, t);
     this.msg = msg;
   }
 
   /**
-   * This method is used to get the localized message.
-   *
-   * @param locale - A Locale object represents a specific geographical,
-   *               political, or cultural region.
-   * @return - Localized error message.
-   */
-  public String getLocalizedMessage(Locale locale) {
-    return "";
-  }
-
-  /**
    * getLocalizedMessage
    */
   @Override public String getLocalizedMessage() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
index 2d96f6c..69f79f8 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
@@ -21,10 +21,8 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
@@ -63,7 +61,9 @@ public abstract class AbstractDataLoadProcessorStep {
    * The output meta for this step. The data returns from this step is as per 
this meta.
    *
    */
-  public abstract DataField[] getOutput();
+  public DataField[] getOutput() {
+    return child.getOutput();
+  }
 
   /**
    * Initialization process for this step.
@@ -95,32 +95,7 @@ public abstract class AbstractDataLoadProcessorStep {
    * @return Array of Iterator with data. It can be processed parallel if 
implementation class wants
    * @throws CarbonDataLoadingException
    */
-  public Iterator<CarbonRowBatch>[] execute() throws 
CarbonDataLoadingException {
-    Iterator<CarbonRowBatch>[] childIters = child.execute();
-    Iterator<CarbonRowBatch>[] iterators = new Iterator[childIters.length];
-    for (int i = 0; i < childIters.length; i++) {
-      iterators[i] = getIterator(childIters[i]);
-    }
-    return iterators;
-  }
-
-  /**
-   * Create the iterator using child iterator.
-   *
-   * @param childIter
-   * @return new iterator with step specific processing.
-   */
-  protected Iterator<CarbonRowBatch> getIterator(final 
Iterator<CarbonRowBatch> childIter) {
-    return new CarbonIterator<CarbonRowBatch>() {
-      @Override public boolean hasNext() {
-        return childIter.hasNext();
-      }
-
-      @Override public CarbonRowBatch next() {
-        return processRowBatch(childIter.next());
-      }
-    };
-  }
+  public abstract Iterator<CarbonRowBatch>[] execute() throws 
CarbonDataLoadingException;
 
   /**
    * Process the batch of rows as per the step logic.
@@ -131,20 +106,12 @@ public abstract class AbstractDataLoadProcessorStep {
   protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch) {
     CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
     while (rowBatch.hasNext()) {
-      newBatch.addRow(processRow(rowBatch.next()));
+      newBatch.addRow(null);
     }
     return newBatch;
   }
 
   /**
-   * Process the row as per the step logic.
-   *
-   * @param row
-   * @return processed row.
-   */
-  protected abstract CarbonRow processRow(CarbonRow row);
-
-  /**
    * Get the step name for logging purpose.
    * @return Step name
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
index 50ebc34..1e53817 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.processing.loading.events;
 
-import java.util.Map;
-
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.events.Event;
@@ -32,24 +30,11 @@ public class LoadEvents {
   public static class LoadTablePreExecutionEvent extends Event {
     private CarbonTableIdentifier carbonTableIdentifier;
     private CarbonLoadModel carbonLoadModel;
-    private String factPath;
-    private boolean isDataFrameDefined;
-    private Map<String, String> optionsFinal;
-    // userProvidedOptions are needed if we need only the load options given 
by user
-    private Map<String, String> userProvidedOptions;
-    private boolean isOverWriteTable;
 
     public LoadTablePreExecutionEvent(CarbonTableIdentifier 
carbonTableIdentifier,
-        CarbonLoadModel carbonLoadModel, String factPath, boolean 
isDataFrameDefined,
-        Map<String, String> optionsFinal, Map<String, String> 
userProvidedOptions,
-        boolean isOverWriteTable) {
+        CarbonLoadModel carbonLoadModel) {
       this.carbonTableIdentifier = carbonTableIdentifier;
       this.carbonLoadModel = carbonLoadModel;
-      this.factPath = factPath;
-      this.isDataFrameDefined = isDataFrameDefined;
-      this.optionsFinal = optionsFinal;
-      this.userProvidedOptions = userProvidedOptions;
-      this.isOverWriteTable = isOverWriteTable;
     }
 
     public CarbonTableIdentifier getCarbonTableIdentifier() {
@@ -59,26 +44,6 @@ public class LoadEvents {
     public CarbonLoadModel getCarbonLoadModel() {
       return carbonLoadModel;
     }
-
-    public String getFactPath() {
-      return factPath;
-    }
-
-    public boolean isDataFrameDefined() {
-      return isDataFrameDefined;
-    }
-
-    public Map<String, String> getOptionsFinal() {
-      return optionsFinal;
-    }
-
-    public Map<String, String> getUserProvidedOptions() {
-      return userProvidedOptions;
-    }
-
-    public boolean isOverWriteTable() {
-      return isOverWriteTable;
-    }
   }
 
   /**
@@ -159,27 +124,4 @@ public class LoadEvents {
       return carbonLoadModel;
     }
   }
-
-  /**
-   * Class for handling clean up in case of any failure and abort the 
operation.
-   */
-
-  public static class LoadTableAbortExecutionEvent extends Event {
-    private CarbonTableIdentifier carbonTableIdentifier;
-    private CarbonLoadModel carbonLoadModel;
-    public LoadTableAbortExecutionEvent(CarbonTableIdentifier 
carbonTableIdentifier,
-        CarbonLoadModel carbonLoadModel) {
-      this.carbonTableIdentifier = carbonTableIdentifier;
-      this.carbonLoadModel = carbonLoadModel;
-    }
-
-    public CarbonTableIdentifier getCarbonTableIdentifier() {
-      return carbonTableIdentifier;
-    }
-
-    public CarbonLoadModel getCarbonLoadModel() {
-      return carbonLoadModel;
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/exception/BadRecordFoundException.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/exception/BadRecordFoundException.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/exception/BadRecordFoundException.java
index 3c0fe53..b9a79ed 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/exception/BadRecordFoundException.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/exception/BadRecordFoundException.java
@@ -47,14 +47,6 @@ public class BadRecordFoundException extends 
CarbonDataLoadingException {
     this.msg = msg;
   }
 
-  /**
-   * Constructor
-   *
-   * @param t
-   */
-  public BadRecordFoundException(Throwable t) {
-    super(t);
-  }
 
   /**
    * getMessage

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/exception/NoRetryException.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/exception/NoRetryException.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/exception/NoRetryException.java
index 3533adb..41256a6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/exception/NoRetryException.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/exception/NoRetryException.java
@@ -40,25 +40,6 @@ public class NoRetryException extends RuntimeException {
   }
 
   /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public NoRetryException(String msg, Throwable t) {
-    super(msg, t);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param t
-   */
-  public NoRetryException(Throwable t) {
-    super(t);
-  }
-
-  /**
    * getMessage
    */
   public String getMessage() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java
index 23179fa..7b3c3df 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java
@@ -18,7 +18,6 @@
 package org.apache.carbondata.processing.loading.sort;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * Sort scope options
@@ -43,12 +42,8 @@ public class SortScopeOptions {
     }
   }
 
-  public static boolean isValidSortOption(String sortScope) {
-    return CarbonUtil.isValidSortOption(sortScope);
-  }
-
   public enum SortScope {
-    NO_SORT, BATCH_SORT, LOCAL_SORT, GLOBAL_SORT;
+    NO_SORT, BATCH_SORT, LOCAL_SORT, GLOBAL_SORT
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
index fcc88b5..74e1594 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
@@ -157,14 +157,6 @@ public class ParallelReadMergeSorterImpl extends 
AbstractMergeSorter {
    */
   private boolean processRowToNextStep(SortDataRows sortDataRows, 
SortParameters parameters)
       throws CarbonDataLoadingException {
-    if (null == sortDataRows) {
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      LOGGER.info("Number of Records was Zero");
-      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": 
Write: " + 0;
-      LOGGER.info(logMessage);
-      return false;
-    }
-
     try {
       // start sorting
       sortDataRows.startSorting();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
index 808952b..5419e05 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
@@ -168,14 +168,6 @@ public class ParallelReadMergeSorterWithColumnRangeImpl 
extends AbstractMergeSor
    */
   private boolean processRowToNextStep(SortDataRows[] sortDataRows, 
SortParameters parameters)
       throws CarbonDataLoadingException {
-    if (null == sortDataRows || sortDataRows.length == 0) {
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      LOGGER.info("Number of Records was Zero");
-      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": 
Write: " + 0;
-      LOGGER.info(logMessage);
-      return false;
-    }
-
     try {
       for (int i = 0; i < sortDataRows.length; i++) {
         // start sorting

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index cb72f54..5cb099e 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -309,14 +309,6 @@ public class UnsafeBatchParallelReadMergeSorterImpl 
extends AbstractMergeSorter
      */
     private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, 
SortParameters parameters)
         throws CarbonDataLoadingException {
-      if (null == sortDataRows) {
-        LOGGER.info("Record Processed For table: " + 
parameters.getTableName());
-        LOGGER.info("Number of Records was Zero");
-        String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": 
Write: " + 0;
-        LOGGER.info(logMessage);
-        return false;
-      }
-
       try {
         // start sorting
         sortDataRows.startSorting();
@@ -333,6 +325,5 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends 
AbstractMergeSorter
         throw new CarbonDataLoadingException(e);
       }
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index e9cc986..afa30c0 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -148,14 +148,6 @@ public class UnsafeParallelReadMergeSorterImpl extends 
AbstractMergeSorter {
    */
   private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, 
SortParameters parameters)
       throws CarbonDataLoadingException {
-    if (null == sortDataRows) {
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      LOGGER.info("Number of Records was Zero");
-      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": 
Write: " + 0;
-      LOGGER.info(logMessage);
-      return false;
-    }
-
     try {
       // start sorting
       sortDataRows.startSorting();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
index da90630..5766105 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
@@ -163,14 +163,6 @@ public class 
UnsafeParallelReadMergeSorterWithColumnRangeImpl extends AbstractMe
    */
   private boolean processRowToNextStep(UnsafeSortDataRows[] sortDataRows, 
SortParameters parameters)
       throws CarbonDataLoadingException {
-    if (null == sortDataRows || sortDataRows.length == 0) {
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      LOGGER.info("Number of Records was Zero");
-      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": 
Write: " + 0;
-      LOGGER.info(logMessage);
-      return false;
-    }
-
     try {
       for (int i = 0; i < sortDataRows.length; i++) {
         // start sorting

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
index e6e24ec..0b4eae2 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
@@ -194,14 +194,6 @@ public class UnsafeIntermediateMerger {
     mergerTask.add(executorService.submit(merger));
   }
 
-  private int getTotalNumberOfRows(List<UnsafeCarbonRowPage> 
unsafeCarbonRowPages) {
-    int totalSize = 0;
-    for (UnsafeCarbonRowPage unsafeCarbonRowPage : unsafeCarbonRowPages) {
-      totalSize += unsafeCarbonRowPage.getBuffer().getActualSize();
-    }
-    return totalSize;
-  }
-
   public void finish() throws CarbonSortKeyAndGroupByException {
     try {
       executorService.shutdown();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 1224674..1a05b12 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -39,7 +39,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.loading.DataField;
 import 
org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
@@ -88,10 +87,6 @@ public class CarbonRowDataWriterProcessorStepImpl extends 
AbstractDataLoadProces
         
CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable());
   }
 
-  @Override public DataField[] getOutput() {
-    return child.getOutput();
-  }
-
   @Override public void initialize() throws IOException {
     super.initialize();
     child.initialize();
@@ -173,8 +168,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends 
AbstractDataLoadProces
     while (iterator.hasNext()) {
       if (rowsNotExist) {
         rowsNotExist = false;
-        dataHandler = CarbonFactHandlerFactory
-            .createCarbonFactHandler(model, 
CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+        dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
         dataHandler.initialise();
       }
       processBatch(iterator.next(), dataHandler, iteratorIndex);
@@ -297,10 +291,6 @@ public class CarbonRowDataWriterProcessorStepImpl extends 
AbstractDataLoadProces
     rowCounter.getAndAdd(batch.getSize());
   }
 
-  @Override protected CarbonRow processRow(CarbonRow row) {
-    return null;
-  }
-
   class DataWriterRunnable implements Runnable {
 
     private Iterator<CarbonRowBatch> iterator;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
index 43b2278..e3bc97f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
@@ -37,6 +37,7 @@ import 
org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.loading.converter.FieldConverter;
 import org.apache.carbondata.processing.loading.converter.RowConverter;
 import 
org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.partition.Partitioner;
 import 
org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl;
 import 
org.apache.carbondata.processing.loading.partition.impl.RangePartitionerImpl;
@@ -64,11 +65,6 @@ public class DataConverterProcessorStepImpl extends 
AbstractDataLoadProcessorSte
   }
 
   @Override
-  public DataField[] getOutput() {
-    return child.getOutput();
-  }
-
-  @Override
   public void initialize() throws IOException {
     super.initialize();
     child.initialize();
@@ -156,14 +152,22 @@ public class DataConverterProcessorStepImpl extends 
AbstractDataLoadProcessorSte
     }
   }
 
+  @Override public Iterator<CarbonRowBatch>[] execute() throws 
CarbonDataLoadingException {
+    Iterator<CarbonRowBatch>[] childIters = child.execute();
+    Iterator<CarbonRowBatch>[] iterators = new Iterator[childIters.length];
+    for (int i = 0; i < childIters.length; i++) {
+      iterators[i] = getIterator(childIters[i]);
+    }
+    return iterators;
+  }
+
   /**
    * Create the iterator using child iterator.
    *
    * @param childIter
    * @return new iterator with step specific processing.
    */
-  @Override
-  protected Iterator<CarbonRowBatch> getIterator(final 
Iterator<CarbonRowBatch> childIter) {
+  private Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> 
childIter) {
     return new CarbonIterator<CarbonRowBatch>() {
       private boolean first = true;
       private RowConverter localConverter;
@@ -209,11 +213,6 @@ public class DataConverterProcessorStepImpl extends 
AbstractDataLoadProcessorSte
   }
 
   @Override
-  protected CarbonRow processRow(CarbonRow row) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   public void close() {
     if (!closed) {
       if (null != badRecordLogger) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
index 9a11266..71a624e 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
@@ -31,7 +31,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.loading.DataField;
 import 
org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
 import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
@@ -58,10 +57,6 @@ public class DataWriterBatchProcessorStepImpl extends 
AbstractDataLoadProcessorS
         
CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable());
   }
 
-  @Override public DataField[] getOutput() {
-    return child.getOutput();
-  }
-
   @Override public void initialize() throws IOException {
     super.initialize();
     child.initialize();
@@ -94,10 +89,10 @@ public class DataWriterBatchProcessorStepImpl extends 
AbstractDataLoadProcessorS
           if (next.hasNext()) {
             DataMapWriterListener listener = getDataMapWriterListener(0);
             CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
-                .createCarbonFactDataHandlerModel(configuration, 
storeLocation, 0, k++, listener);
+                .createCarbonFactDataHandlerModel(configuration, 
storeLocation, i, k++, listener);
             model.setColumnLocalDictGenMap(this.localDictionaryGeneratorMap);
             CarbonFactHandler dataHandler = CarbonFactHandlerFactory
-                .createCarbonFactHandler(model, 
CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+                .createCarbonFactHandler(model);
             dataHandler.initialise();
             processBatch(next, dataHandler);
             finish(tableName, dataHandler);
@@ -157,9 +152,4 @@ public class DataWriterBatchProcessorStepImpl extends 
AbstractDataLoadProcessorS
     batch.close();
     rowCounter.getAndAdd(batchSize);
   }
-
-  @Override protected CarbonRow processRow(CarbonRow row) {
-    return null;
-  }
-
 }

Reply via email to