http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala index 55fa2bd..37abd73 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala @@ -193,26 +193,13 @@ case class AlterTableCompactionPostStatusUpdateEvent(sparkSession: SparkSession, /** * Compaction Event for handling clean up in case of any compaction failure and abort the - * operation, lister has to implement this event to handle failure scenarios - * - * @param carbonTable - * @param carbonMergerMapping - * @param mergedLoadName - */ -case class AlterTableCompactionAbortEvent(sparkSession: SparkSession, - carbonTable: CarbonTable, - carbonMergerMapping: CarbonMergerMapping, - mergedLoadName: String) extends Event with AlterTableCompactionEventInfo - - -/** - * Compaction Event for handling exception in compaction + * * operation, lister has to implement this event to handle failure scenarios * * @param sparkSession * @param carbonTable * @param alterTableModel */ -case class AlterTableCompactionExceptionEvent(sparkSession: SparkSession, +case class AlterTableCompactionAbortEvent(sparkSession: SparkSession, carbonTable: CarbonTable, alterTableModel: AlterTableModel) extends Event with AlterTableCompactionEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala index 1a9c5f6..b7e9c20 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala @@ -36,11 +36,3 @@ case class CleanFilesPreEvent(carbonTable: CarbonTable, sparkSession: SparkSessi */ case class CleanFilesPostEvent(carbonTable: CarbonTable, sparkSession: SparkSession) extends Event with CleanFilesEventInfo - -/** - * - * @param carbonTable - * @param sparkSession - */ -case class CleanFilesAbortEvent(carbonTable: CarbonTable, sparkSession: SparkSession) - extends Event with CleanFilesEventInfo http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala index 5f23f77..9724fa8 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala @@ -31,11 +31,6 @@ class CarbonOption(options: Map[String, String]) { def partitionCount: String = options.getOrElse("partitionCount", "1") - def partitionClass: String = { - options.getOrElse("partitionClass", - "org.apache.carbondata.processing.partition.impl.SampleDataPartitionerImpl") - } - def tempCSV: Boolean = options.getOrElse("tempCSV", "false").toBoolean def compress: Boolean = options.getOrElse("compress", "false").toBoolean http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index f443214..73ed769 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -254,8 +254,7 @@ object DataLoadProcessorStepOnSpark { while (rows.hasNext) { if (rowsNotExist) { rowsNotExist = false - dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(dataHandlerModel, - CarbonFactHandlerFactory.FactHandlerType.COLUMNAR) + dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(dataHandlerModel) dataHandler.initialise() } val row = dataWriter.processRow(rows.next(), dataHandler) http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala index cf22b3d..2ec8b9c 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala @@ -68,9 +68,6 @@ trait GenericParser { def parseString(input: String): Unit } -case class DictionaryStats(distinctValues: java.util.List[String], - dictWriteTime: Long, sortIndexWriteTime: Long) - case class PrimitiveParser(dimension: CarbonDimension, setOpt: Option[mutable.HashSet[String]]) extends GenericParser { val (hasDictEncoding, set: mutable.HashSet[String]) = setOpt match { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 401ba29..df3072b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -47,15 +47,13 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.mutate.UpdateVO import org.apache.carbondata.core.scan.result.iterator.RawResultIterator import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentUpdateStatusManager} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} -import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil} import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo} import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger._ -import org.apache.carbondata.processing.splits.TableSplit import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.MergeResult import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, SparkDataTypeConverterImpl, Util} @@ -517,16 +515,3 @@ class CarbonMergerRDD[K, V]( theSplit.split.value.getLocations.filter(_ != "localhost") } } - - -class CarbonLoadPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit) - extends Partition { - - override val index: Int = idx - val serializableHadoopSplit = new SerializableWritable[TableSplit](tableSplit) - - override def hashCode(): Int = 41 * (41 + rddId) + idx -} - -case class SplitTaskInfo (splits: List[CarbonInputSplit]) extends CarbonInputSplit{ -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 80b2d12..6b136bc 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -50,7 +50,6 @@ import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator} import org.apache.carbondata.processing.loading.exception.NoRetryException import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.processing.splits.TableSplit import org.apache.carbondata.processing.util.CarbonQueryUtil import org.apache.carbondata.spark.DataLoadResult import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util} @@ -103,21 +102,6 @@ class CarbonNodePartition(rddId: Int, val idx: Int, host: String, override def hashCode(): Int = 41 * (41 + rddId) + idx } -/** - * This partition class use to split by TableSplit - * - */ -class CarbonTableSplitPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit, - val blocksDetails: Array[BlockDetails]) - extends Partition { - - override val index: Int = idx - val serializableHadoopSplit = new SerializableWritable[TableSplit](tableSplit) - val partitionBlocksDetail = blocksDetails - - override def hashCode(): Int = 41 * (41 + rddId) + idx -} - class SparkPartitionLoader(model: CarbonLoadModel, splitIndex: Long, storePath: String, @@ -140,7 +124,6 @@ class SparkPartitionLoader(model: CarbonLoadModel, CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true") CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true") CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false") - CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000") // this property is used to determine whether temp location for carbon is inside // container temp dir or is yarn application directory. http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 30c1874..28cd7ef 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -88,23 +88,6 @@ object CarbonScalaUtil { } } - def convertSparkToCarbonSchemaDataType(dataType: String): String = { - dataType match { - case CarbonCommonConstants.STRING_TYPE => CarbonCommonConstants.STRING - case CarbonCommonConstants.INTEGER_TYPE => CarbonCommonConstants.INTEGER - case CarbonCommonConstants.BYTE_TYPE => CarbonCommonConstants.INTEGER - case CarbonCommonConstants.SHORT_TYPE => CarbonCommonConstants.SHORT - case CarbonCommonConstants.LONG_TYPE => CarbonCommonConstants.NUMERIC - case CarbonCommonConstants.DOUBLE_TYPE => CarbonCommonConstants.NUMERIC - case CarbonCommonConstants.FLOAT_TYPE => CarbonCommonConstants.NUMERIC - case CarbonCommonConstants.DECIMAL_TYPE => CarbonCommonConstants.NUMERIC - case CarbonCommonConstants.DATE_TYPE => CarbonCommonConstants.STRING - case CarbonCommonConstants.BOOLEAN_TYPE => CarbonCommonConstants.STRING - case CarbonCommonConstants.TIMESTAMP_TYPE => CarbonCommonConstants.TIMESTAMP - case anyType => anyType - } - } - def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = { if (CarbonDataTypes.isDecimal(dataType)) { DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision, @@ -361,77 +344,6 @@ object CarbonScalaUtil { } /** - * This method will validate a column for its data type and check whether the column data type - * can be modified and update if conditions are met - * - * @param dataTypeInfo - * @param carbonColumn - */ - def validateColumnDataType(dataTypeInfo: DataTypeInfo, carbonColumn: CarbonColumn): Unit = { - carbonColumn.getDataType.getName match { - case "INT" => - if (!dataTypeInfo.dataType.equals("bigint") && !dataTypeInfo.dataType.equals("long")) { - sys - .error(s"Given column ${ carbonColumn.getColName } with data type ${ - carbonColumn - .getDataType.getName - } cannot be modified. Int can only be changed to bigInt or long") - } - case "DECIMAL" => - if (!dataTypeInfo.dataType.equals("decimal")) { - sys - .error(s"Given column ${ carbonColumn.getColName } with data type ${ - carbonColumn.getDataType.getName - } cannot be modified. Decimal can be only be changed to Decimal of higher precision") - } - if (dataTypeInfo.precision <= carbonColumn.getColumnSchema.getPrecision) { - sys - .error(s"Given column ${ - carbonColumn - .getColName - } cannot be modified. Specified precision value ${ - dataTypeInfo - .precision - } should be greater than current precision value ${ - carbonColumn.getColumnSchema - .getPrecision - }") - } else if (dataTypeInfo.scale < carbonColumn.getColumnSchema.getScale) { - sys - .error(s"Given column ${ - carbonColumn - .getColName - } cannot be modified. Specified scale value ${ - dataTypeInfo - .scale - } should be greater or equal to current scale value ${ - carbonColumn.getColumnSchema - .getScale - }") - } else { - // difference of precision and scale specified by user should not be less than the - // difference of already existing precision and scale else it will result in data loss - val carbonColumnPrecisionScaleDiff = carbonColumn.getColumnSchema.getPrecision - - carbonColumn.getColumnSchema.getScale - val dataInfoPrecisionScaleDiff = dataTypeInfo.precision - dataTypeInfo.scale - if (dataInfoPrecisionScaleDiff < carbonColumnPrecisionScaleDiff) { - sys - .error(s"Given column ${ - carbonColumn - .getColName - } cannot be modified. Specified precision and scale values will lead to data loss") - } - } - case _ => - sys - .error(s"Given column ${ carbonColumn.getColName } with data type ${ - carbonColumn - .getDataType.getName - } cannot be modified. Only Int and Decimal data types are allowed for modification") - } - } - - /** * returns all fields except tupleId field as it is not required in the value * * @param fields http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 2f08d07..39530f4 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -65,77 +65,6 @@ object CommonUtil { val FIXED_DECIMAL = """decimal\(\s*(\d+)\s*,\s*(\-?\d+)\s*\)""".r val FIXED_DECIMALTYPE = """decimaltype\(\s*(\d+)\s*,\s*(\-?\d+)\s*\)""".r - def validateColumnGroup(colGroup: String, noDictionaryDims: Seq[String], - msrs: Seq[Field], retrievedColGrps: Seq[String], dims: Seq[Field]) { - val colGrpCols = colGroup.split(',').map(_.trim) - colGrpCols.foreach { x => - // if column is no dictionary - if (noDictionaryDims.contains(x)) { - throw new MalformedCarbonCommandException( - "Column group is not supported for no dictionary columns:" + x) - } else if (msrs.exists(msr => msr.column.equals(x))) { - // if column is measure - throw new MalformedCarbonCommandException("Column group is not supported for measures:" + x) - } else if (foundIndExistingColGrp(x)) { - throw new MalformedCarbonCommandException("Column is available in other column group:" + x) - } else if (isComplex(x, dims)) { - throw new MalformedCarbonCommandException( - "Column group doesn't support Complex column:" + x) - } else if (isTimeStampColumn(x, dims)) { - throw new MalformedCarbonCommandException( - "Column group doesn't support Timestamp datatype:" + x) - }// if invalid column is - else if (!dims.exists(dim => dim.column.equalsIgnoreCase(x))) { - // present - throw new MalformedCarbonCommandException( - "column in column group is not a valid column: " + x - ) - } - } - // check if given column is present in other groups - def foundIndExistingColGrp(colName: String): Boolean = { - retrievedColGrps.foreach { colGrp => - if (colGrp.split(",").contains(colName)) { - return true - } - } - false - } - - } - - - def isTimeStampColumn(colName: String, dims: Seq[Field]): Boolean = { - dims.foreach { dim => - if (dim.column.equalsIgnoreCase(colName)) { - if (dim.dataType.isDefined && null != dim.dataType.get && - "timestamp".equalsIgnoreCase(dim.dataType.get)) { - return true - } - } - } - false - } - - def isComplex(colName: String, dims: Seq[Field]): Boolean = { - dims.foreach { x => - if (x.children.isDefined && null != x.children.get && x.children.get.nonEmpty) { - val children = x.children.get - if (x.column.equals(colName)) { - return true - } else { - children.foreach { child => - val fieldName = x.column + "." + child.column - if (fieldName.equalsIgnoreCase(colName)) { - return true - } - } - } - } - } - false - } - def getColumnProperties(column: String, tableProperties: Map[String, String]): Option[util.List[ColumnProperty]] = { val fieldProps = new util.ArrayList[ColumnProperty]() @@ -438,38 +367,6 @@ object CommonUtil { } /** - * @param colGrps - * @param dims - * @return columns of column groups in schema order - */ - def arrangeColGrpsInSchemaOrder(colGrps: Seq[String], dims: Seq[Field]): Seq[String] = { - def sortByIndex(colGrp1: String, colGrp2: String) = { - val firstCol1 = colGrp1.split(",")(0) - val firstCol2 = colGrp2.split(",")(0) - val dimIndex1: Int = getDimIndex(firstCol1, dims) - val dimIndex2: Int = getDimIndex(firstCol2, dims) - dimIndex1 < dimIndex2 - } - val sortedColGroups: Seq[String] = colGrps.sortWith(sortByIndex) - sortedColGroups - } - - /** - * @param colName - * @param dims - * @return return index for given column in dims - */ - def getDimIndex(colName: String, dims: Seq[Field]): Int = { - var index: Int = -1 - dims.zipWithIndex.foreach { h => - if (h._1.column.equalsIgnoreCase(colName)) { - index = h._2.toInt - } - } - index - } - - /** * validate table level properties for compaction * * @param tableProperties @@ -692,58 +589,6 @@ object CommonUtil { } } - def validateMaxColumns(csvHeaders: Array[String], maxColumns: String): Int = { - /* - User configures both csvheadercolumns, maxcolumns, - if csvheadercolumns >= maxcolumns, give error - if maxcolumns > threashold, give error - User configures csvheadercolumns - if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1 - if csvheadercolumns >= threashold, give error - User configures nothing - if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1 - if csvheadercolumns >= threashold, give error - */ - val columnCountInSchema = csvHeaders.length - var maxNumberOfColumnsForParsing = 0 - val maxColumnsInt = getMaxColumnValue(maxColumns) - if (maxColumnsInt != null) { - if (columnCountInSchema >= maxColumnsInt) { - CarbonException.analysisException( - s"csv headers should be less than the max columns: $maxColumnsInt") - } else if (maxColumnsInt > CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) { - CarbonException.analysisException( - s"max columns cannot be greater than the threshold value: " + - s"${CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING}") - } else { - maxNumberOfColumnsForParsing = maxColumnsInt - } - } else if (columnCountInSchema >= CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) { - CarbonException.analysisException( - s"csv header columns should be less than max threashold: " + - s"${CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING}") - } else if (columnCountInSchema >= CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) { - maxNumberOfColumnsForParsing = columnCountInSchema + 1 - } else { - maxNumberOfColumnsForParsing = CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING - } - maxNumberOfColumnsForParsing - } - - private def getMaxColumnValue(maxColumn: String): Integer = { - if (maxColumn != null) { - try { - maxColumn.toInt - } catch { - case e: Exception => - LOGGER.error(s"Invalid value for max column in load options ${ e.getMessage }") - null - } - } else { - null - } - } - def getPartitionInfo(columnName: String, partitionType: PartitionType, partitionInfo: PartitionInfo): Seq[Row] = { var result = Seq.newBuilder[Row] http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala index 6cd28c0..c8af869 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala @@ -82,28 +82,6 @@ object DataTypeConverterUtil { } } - def convertToString(dataType: DataType): String = { - if (DataTypes.isDecimal(dataType)) { - "decimal" - } else if (DataTypes.isArrayType(dataType)) { - "array" - } else if (DataTypes.isStructType(dataType)) { - "struct" - } else { - dataType match { - case DataTypes.BOOLEAN => "boolean" - case DataTypes.STRING => "string" - case DataTypes.SHORT => "smallint" - case DataTypes.INT => "int" - case DataTypes.LONG => "bigint" - case DataTypes.DOUBLE => "double" - case DataTypes.FLOAT => "double" - case DataTypes.TIMESTAMP => "timestamp" - case DataTypes.DATE => "date" - } - } - } - /** * convert from wrapper to external data type * http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala index 5129e59..1bb3912 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala @@ -159,78 +159,6 @@ object GlobalDictionaryUtil { dimensionsWithDict.toArray } - /** - * invoke CarbonDictionaryWriter to write dictionary to file. - * - * @param model instance of DictionaryLoadModel - * @param columnIndex the index of current column in column list - * @param iter distinct value list of dictionary - */ - def writeGlobalDictionaryToFile(model: DictionaryLoadModel, - columnIndex: Int, - iter: Iterator[String]): Unit = { - val dictService = CarbonCommonFactory.getDictionaryService - val dictionaryColumnUniqueIdentifier: DictionaryColumnUniqueIdentifier = new - DictionaryColumnUniqueIdentifier( - model.table, - model.columnIdentifier(columnIndex), - model.columnIdentifier(columnIndex).getDataType) - val writer: CarbonDictionaryWriter = dictService - .getDictionaryWriter(dictionaryColumnUniqueIdentifier) - try { - while (iter.hasNext) { - writer.write(iter.next) - } - } finally { - writer.close() - } - } - - /** - * read global dictionary from cache - */ - def readGlobalDictionaryFromCache(model: DictionaryLoadModel): HashMap[String, Dictionary] = { - val dictMap = new HashMap[String, Dictionary] - model.primDimensions.zipWithIndex.filter(f => model.dictFileExists(f._2)).foreach { m => - val dict = CarbonLoaderUtil.getDictionary(model.table, - m._1.getColumnIdentifier, m._1.getDataType - ) - dictMap.put(m._1.getColumnId, dict) - } - dictMap - } - - /** - * invoke CarbonDictionaryReader to read dictionary from files. - * - * @param model carbon dictionary load model - */ - def readGlobalDictionaryFromFile(model: DictionaryLoadModel): HashMap[String, HashSet[String]] = { - val dictMap = new HashMap[String, HashSet[String]] - val dictService = CarbonCommonFactory.getDictionaryService - for (i <- model.primDimensions.indices) { - val dictionaryColumnUniqueIdentifier: DictionaryColumnUniqueIdentifier = new - DictionaryColumnUniqueIdentifier( - model.table, - model.columnIdentifier(i), - model.columnIdentifier(i).getDataType) - val set = new HashSet[String] - if (model.dictFileExists(i)) { - val reader: CarbonDictionaryReader = dictService.getDictionaryReader( - dictionaryColumnUniqueIdentifier) - val values = reader.read - if (values != null) { - for (j <- 0 until values.size) { - set.add(new String(values.get(j), - Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))) - } - } - } - dictMap.put(model.primDimensions(i).getColumnId, set) - } - dictMap - } - def generateParserForChildrenDimension(dim: CarbonDimension, format: DataFormat, mapColumnValuesWithId: http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala index 840b03f..cc8a28e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala @@ -100,13 +100,8 @@ object StreamSinkFactory { val operationContext = new OperationContext val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent( carbonTable.getCarbonTableIdentifier, - carbonLoadModel, - carbonLoadModel.getFactFilePath, - false, - parameters.asJava, - parameters.asJava, - false - ) + carbonLoadModel + ) OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext) // prepare the stream segment val segmentId = getStreamSegmentId(carbonTable) http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala deleted file mode 100644 index bcca7ed..0000000 --- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import scala.reflect.ClassTag - -import org.apache.spark._ - -import org.apache.carbondata.spark.rdd.CarbonRDD - - -// This RDD distributes previous RDD data based on number of nodes. i.e., one partition for one node - -class UpdateCoalescedRDD[T: ClassTag]( - @transient var prev: RDD[T], - nodeList: Array[String]) - extends CarbonRDD[T](prev.context, Nil, prev.sparkContext.hadoopConfiguration) { - - override def getPartitions: Array[Partition] = { - new DataLoadPartitionCoalescer(prev, nodeList).run - } - - override def internalCompute(split: Partition, - context: TaskContext): Iterator[T] = { - // This iterator combines data from all the parent partitions - new Iterator[T] { - val parentPartitionIter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator - var currentDataIter: Iterator[T] = _ - val prevRdd = firstParent[T] - - def hasNext: Boolean = { - while (currentDataIter == null || - !currentDataIter.hasNext && - parentPartitionIter.hasNext) { - val currentPartition = parentPartitionIter.next() - currentDataIter = prevRdd.compute(currentPartition, context) - } - if (currentDataIter == null) { - false - } else { - currentDataIter.hasNext - } - } - - def next: T = { - currentDataIter.next() - } - } - } - - override def getDependencies: Seq[Dependency[_]] = { - Seq(new NarrowDependency(prev) { - def getParents(id: Int): Seq[Int] = { - partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices - } - }) - } - - override def clearDependencies() { - super.clearDependencies() - prev = null - } - - /** - * Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition, - * then the preferred machine will be one which most parent splits prefer too. - * - * @param partition - * @return the machine most preferred by split - */ - override def getPreferredLocations(partition: Partition): Seq[String] = { - partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index d705fc9..0a0b49f 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -289,13 +289,6 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { // column properties val colProps = extractColumnProperties(fields, tableProperties) - // get column groups configuration from table properties. - val groupCols: Seq[String] = updateColumnGroupsInField(tableProperties, - noDictionaryDims, msrs, dims) - if (groupCols != null) { - throw new MalformedCarbonCommandException( - s"${CarbonCommonConstants.COLUMN_GROUPS} is deprecated") - } // validate the local dictionary property if defined if (tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE).isDefined) { @@ -409,7 +402,6 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { Option(varcharColumns), Option(noDictionaryDims), Option(noInvertedIdxCols), - groupCols, Some(colProps), bucketFields: Option[BucketFields], partitionInfo, @@ -488,74 +480,6 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { } /** - * Extract the column groups configuration from table properties. - * Based on this Row groups of fields will be determined. - * - * @param tableProperties - * @return - */ - protected def updateColumnGroupsInField(tableProperties: mutable.Map[String, String], - noDictionaryDims: Seq[String], - msrs: Seq[Field], - dims: Seq[Field]): Seq[String] = { - if (tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).isDefined) { - - var splittedColGrps: Seq[String] = Seq[String]() - val nonSplitCols: String = tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get - - // row groups will be specified in table properties like -> "(col1,col2),(col3,col4)" - // here first splitting the value by () . so that the above will be splitted into 2 strings. - // [col1,col2] [col3,col4] - val m: Matcher = Pattern.compile("\\(([^)]+)\\)").matcher(nonSplitCols) - while (m.find()) { - val oneGroup: String = m.group(1) - CommonUtil.validateColumnGroup(oneGroup, noDictionaryDims, msrs, splittedColGrps, dims) - val arrangedColGrp = rearrangedColumnGroup(oneGroup, dims) - splittedColGrps :+= arrangedColGrp - } - // This will be furthur handled. - CommonUtil.arrangeColGrpsInSchemaOrder(splittedColGrps, dims) - } else { - null - } - } - - def rearrangedColumnGroup(colGroup: String, dims: Seq[Field]): String = { - // if columns in column group is not in schema order than arrange it in schema order - var colGrpFieldIndx: Seq[Int] = Seq[Int]() - colGroup.split(',').map(_.trim).foreach { x => - dims.zipWithIndex.foreach { dim => - if (dim._1.column.equalsIgnoreCase(x)) { - colGrpFieldIndx :+= dim._2 - } - } - } - // sort it - colGrpFieldIndx = colGrpFieldIndx.sorted - // check if columns in column group is in schema order - if (!checkIfInSequence(colGrpFieldIndx)) { - throw new MalformedCarbonCommandException("Invalid column group:" + colGroup) - } - def checkIfInSequence(colGrpFieldIndx: Seq[Int]): Boolean = { - for (i <- 0 until (colGrpFieldIndx.length - 1)) { - if ((colGrpFieldIndx(i + 1) - colGrpFieldIndx(i)) != 1) { - throw new MalformedCarbonCommandException( - "Invalid column group,column in group should be contiguous as per schema.") - } - } - true - } - val colGrpNames: StringBuilder = StringBuilder.newBuilder - for (i <- colGrpFieldIndx.indices) { - colGrpNames.append(dims(colGrpFieldIndx(i)).column) - if (i < (colGrpFieldIndx.length - 1)) { - colGrpNames.append(",") - } - } - colGrpNames.toString() - } - - /** * @param partitionCols * @param tableProperties */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index e50a8fd..a61f94f 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -63,7 +63,6 @@ case class TableModel( varcharCols: Option[Seq[String]], highcardinalitydims: Option[Seq[String]], noInvertedIdxCols: Option[Seq[String]], - columnGroups: Seq[String], colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None, bucketFields: Option[BucketFields], partitionInfo: Option[PartitionInfo], @@ -98,9 +97,6 @@ case class ColumnProperty(key: String, value: String) case class ComplexField(complexType: String, primitiveField: Option[Field], complexField: Option[ComplexField]) -case class Partitioner(partitionClass: String, partitionColumn: Array[String], partitionCount: Int, - nodeList: Array[String]) - case class PartitionerField(partitionColumn: String, dataType: Option[String], columnComment: String) @@ -512,7 +508,6 @@ object TableNewProcessor { val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema) columnSchema.setColumnUniqueId(columnUniqueId) columnSchema.setColumnReferenceId(columnUniqueId) - columnSchema.setColumnar(true) columnSchema.setDimensionColumn(isDimensionCol) columnSchema.setPrecision(precision) columnSchema.setScale(scale) @@ -761,10 +756,6 @@ class TableNewProcessor(cm: TableModel) { val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq()) - checkColGroupsValidity(cm.columnGroups, allColumns, highCardinalityDims) - - updateColumnGroupsInFields(cm.columnGroups, allColumns) - // Setting the boolean value of useInvertedIndex in column schema, if Paranet table is defined // Encoding is already decided above if (!cm.parentTable.isDefined) { @@ -925,25 +916,4 @@ class TableNewProcessor(cm: TableModel) { }) } } - - // For updating the col group details for fields. - private def updateColumnGroupsInFields(colGrps: Seq[String], allCols: Seq[ColumnSchema]): Unit = { - if (null != colGrps) { - var colGroupId = -1 - colGrps.foreach(columngroup => { - colGroupId += 1 - val rowCols = columngroup.split(",") - rowCols.foreach(row => { - - allCols.foreach(eachCol => { - - if (eachCol.getColumnName.equalsIgnoreCase(row.trim)) { - eachCol.setColumnGroup(colGroupId) - eachCol.setColumnar(false) - } - }) - }) - }) - } - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala index 44f96bd..ffaac86 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala @@ -113,12 +113,7 @@ class CarbonAppendableStreamSink( // in case of streaming options and optionsFinal can be same val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent( carbonTable.getCarbonTableIdentifier, - carbonLoadModel, - carbonLoadModel.getFactFilePath, - false, - parameters.asJava, - parameters.asJava, - false) + carbonLoadModel) OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext) checkOrHandOffSegment() http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml index 9b9e71d..24af8ec 100644 --- a/integration/spark2/pom.xml +++ b/integration/spark2/pom.xml @@ -31,6 +31,7 @@ <properties> <dev.path>${basedir}/../../dev</dev.path> + <jacoco.append>true</jacoco.append> </properties> <dependencies> http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java index 0642e01..cb5a1b1 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java @@ -24,7 +24,6 @@ import java.util.List; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.exceptions.MetadataProcessException; import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; -import org.apache.carbondata.core.datamap.DataMapCatalog; import org.apache.carbondata.core.datamap.DataMapProvider; import org.apache.carbondata.core.datamap.DataMapRegistry; import org.apache.carbondata.core.datamap.DataMapStoreManager; @@ -81,11 +80,6 @@ public class IndexDataMapProvider extends DataMapProvider { } @Override - public void initData() { - // Nothing is needed to do by default - } - - @Override public void cleanMeta() throws IOException { if (getMainTable() == null) { throw new UnsupportedOperationException("Table need to be specified in index datamaps"); @@ -108,11 +102,6 @@ public class IndexDataMapProvider extends DataMapProvider { IndexDataMapRebuildRDD.rebuildDataMap(sparkSession, getMainTable(), getDataMapSchema()); } - @Override - public void incrementalBuild(String[] segmentIds) { - throw new UnsupportedOperationException(); - } - private DataMapFactory<? extends DataMap> createDataMapFactory() throws MalformedDataMapCommandException { CarbonTable mainTable = getMainTable(); @@ -135,12 +124,6 @@ public class IndexDataMapProvider extends DataMapProvider { } @Override - public DataMapCatalog createDataMapCatalog() { - // TODO create abstract class and move the default implementation there. - return null; - } - - @Override public DataMapFactory getDataMapFactory() { return dataMapFactory; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java index 8226f22..a1f80b1 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datamap.DataMapCatalog; import org.apache.carbondata.core.datamap.DataMapProvider; import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.metadata.schema.datamap.DataMapProperty; @@ -74,11 +73,6 @@ public class PreAggregateDataMapProvider extends DataMapProvider { } @Override - public void initData() { - // Nothing is needed to do by default - } - - @Override public void cleanMeta() { DataMapSchema dataMapSchema = getDataMapSchema(); dropTableCommand = new CarbonDropTableCommand( @@ -103,15 +97,6 @@ public class PreAggregateDataMapProvider extends DataMapProvider { } } - @Override public void incrementalBuild(String[] segmentIds) { - throw new UnsupportedOperationException(); - } - - @Override public DataMapCatalog createDataMapCatalog() { - // TODO manage pre-agg also with catalog. - return null; - } - @Override public DataMapFactory getDataMapFactory() { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index 9fdeb9d..d467017 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -109,8 +109,8 @@ case class CarbonAlterTableCompactionCommand( compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase) } catch { case _: Exception => - val alterTableCompactionExceptionEvent: AlterTableCompactionExceptionEvent = - AlterTableCompactionExceptionEvent(sparkSession, table, alterTableModel) + val alterTableCompactionExceptionEvent: AlterTableCompactionAbortEvent = + AlterTableCompactionAbortEvent(sparkSession, table, alterTableModel) OperationListenerBus.getInstance .fireEvent(alterTableCompactionExceptionEvent, operationContext) compactionException = operationContext.getProperty("compactionException").toString http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 6b1865a..86a2bc1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -236,12 +236,7 @@ case class CarbonLoadDataCommand( val loadTablePreExecutionEvent: LoadTablePreExecutionEvent = new LoadTablePreExecutionEvent( table.getCarbonTableIdentifier, - carbonLoadModel, - factPath, - dataFrame.isDefined, - optionsFinal, - options.asJava, - isOverwriteTable) + carbonLoadModel) operationContext.setProperty("isOverwrite", isOverwriteTable) OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext) // Add pre event listener for index datamap http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala index e80579d..97eccc6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala @@ -122,51 +122,6 @@ object TimeSeriesUtil { } /** - * Below method will be used to validate the hierarchy of time series and its value - * validation will be done whether hierarchy order is proper or not and hierarchy level - * value - * TODO: we should remove this method - * - * @param timeSeriesHierarchyDetails - * time series hierarchy string - */ - @deprecated - def validateAndGetTimeSeriesHierarchyDetails(timeSeriesHierarchyDetails: String): Array[ - (String, String)] = { - val updatedtimeSeriesHierarchyDetails = timeSeriesHierarchyDetails.toLowerCase - val timeSeriesHierarchy = updatedtimeSeriesHierarchyDetails.split(",") - val hierBuffer = timeSeriesHierarchy.map { - case f => - val splits = f.split("=") - // checking hierarchy name is valid or not - if (!TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION.contains(splits(0).toLowerCase)) { - throw new MalformedCarbonCommandException(s"Not supported hierarchy type: ${ splits(0) }") - } - // validating hierarchy level is valid or not - if (!splits(1).equals("1")) { - throw new MalformedCarbonCommandException( - s"Unsupported Value for hierarchy:" + - s"${ splits(0) }=${ splits(1) }") - } - (splits(0), splits(1)) - } - // checking whether hierarchy is in proper order or not - // get the index of first hierarchy - val indexOfFirstHierarchy = TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION - .indexOf(hierBuffer(0)._1.toLowerCase) - // now iterating through complete hierarchy to check any of the hierarchy index - // is less than first one - for (index <- 1 to hierBuffer.size - 1) { - val currentIndex = TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION - .indexOf(hierBuffer(index)._1.toLowerCase) - if (currentIndex < indexOfFirstHierarchy) { - throw new MalformedCarbonCommandException(s"$timeSeriesHierarchyDetails is in wrong order") - } - } - hierBuffer - } - - /** * Below method will be used to validate whether timeseries column present in * select statement or not * http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 222d18d..8eb47fc 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -541,7 +541,6 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { // default value should not be converted to lower case val tblProps = tblProp.get .map(f => if (CarbonCommonConstants.TABLE_BLOCKSIZE.equalsIgnoreCase(f._1) || - CarbonCommonConstants.COLUMN_GROUPS.equalsIgnoreCase(f._1) || CarbonCommonConstants.SORT_COLUMNS.equalsIgnoreCase(f._1) || CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE.equalsIgnoreCase(f._1) || CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD.equalsIgnoreCase(f._1)) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1413fd1..ec68c1d 100644 --- a/pom.xml +++ b/pom.xml @@ -401,6 +401,8 @@ <exclude>**/*SparkUnknownExpression*.class</exclude> <exclude>**/org/apache/carbondata/cluster/sdv/generated/*</exclude> <exclude>**/org.apache.carbondata.cluster.sdv.generated.*</exclude> + <exclude>**/org.apache.spark.sql.test.*</exclude> + <exclude>**/org.apache.carbondata.format.*</exclude> </excludes> <includes> <include>**/org.apache.*</include> http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/pom.xml ---------------------------------------------------------------------- diff --git a/processing/pom.xml b/processing/pom.xml index ab7c96c..d3cf77a 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -31,6 +31,7 @@ <properties> <dev.path>${basedir}/../dev</dev.path> + <jacoco.append>true</jacoco.append> </properties> <dependencies> http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java b/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java index 15ff95e..03c6c97 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java +++ b/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java @@ -20,31 +20,7 @@ package org.apache.carbondata.processing.exception; public class DataLoadingException extends Exception { private static final long serialVersionUID = 1L; - private long errorCode = -1; - - public DataLoadingException() { - super(); - } - - public DataLoadingException(long errorCode, String message) { - super(message); - this.errorCode = errorCode; - } - public DataLoadingException(String message) { super(message); } - - public DataLoadingException(Throwable cause) { - super(cause); - } - - public DataLoadingException(String message, Throwable cause) { - super(message, cause); - } - - public long getErrorCode() { - return errorCode; - } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java b/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java index 6fa5e2c..b59732c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java +++ b/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java @@ -20,31 +20,7 @@ package org.apache.carbondata.processing.exception; public class MultipleMatchingException extends Exception { private static final long serialVersionUID = 1L; - private long errorCode = -1; - - public MultipleMatchingException() { - super(); - } - - public MultipleMatchingException(long errorCode, String message) { - super(message); - this.errorCode = errorCode; - } - public MultipleMatchingException(String message) { super(message); } - - public MultipleMatchingException(Throwable cause) { - super(cause); - } - - public MultipleMatchingException(String message, Throwable cause) { - super(message, cause); - } - - public long getErrorCode() { - return errorCode; - } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java b/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java index d9640a9..a6f70c3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java +++ b/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java @@ -17,8 +17,6 @@ package org.apache.carbondata.processing.exception; -import java.util.Locale; - public class SliceMergerException extends Exception { /** @@ -36,33 +34,12 @@ public class SliceMergerException extends Exception { * * @param msg The error message for this exception. */ - public SliceMergerException(String msg) { - super(msg); - this.msg = msg; - } - - /** - * Constructor - * - * @param msg The error message for this exception. - */ public SliceMergerException(String msg, Throwable t) { super(msg, t); this.msg = msg; } /** - * This method is used to get the localized message. - * - * @param locale - A Locale object represents a specific geographical, - * political, or cultural region. - * @return - Localized error message. - */ - public String getLocalizedMessage(Locale locale) { - return ""; - } - - /** * getLocalizedMessage */ @Override public String getLocalizedMessage() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java index 2d96f6c..69f79f8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java @@ -21,10 +21,8 @@ import java.io.IOException; import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong; -import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.processing.datamap.DataMapWriterListener; import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.loading.row.CarbonRowBatch; @@ -63,7 +61,9 @@ public abstract class AbstractDataLoadProcessorStep { * The output meta for this step. The data returns from this step is as per this meta. * */ - public abstract DataField[] getOutput(); + public DataField[] getOutput() { + return child.getOutput(); + } /** * Initialization process for this step. @@ -95,32 +95,7 @@ public abstract class AbstractDataLoadProcessorStep { * @return Array of Iterator with data. It can be processed parallel if implementation class wants * @throws CarbonDataLoadingException */ - public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException { - Iterator<CarbonRowBatch>[] childIters = child.execute(); - Iterator<CarbonRowBatch>[] iterators = new Iterator[childIters.length]; - for (int i = 0; i < childIters.length; i++) { - iterators[i] = getIterator(childIters[i]); - } - return iterators; - } - - /** - * Create the iterator using child iterator. - * - * @param childIter - * @return new iterator with step specific processing. - */ - protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) { - return new CarbonIterator<CarbonRowBatch>() { - @Override public boolean hasNext() { - return childIter.hasNext(); - } - - @Override public CarbonRowBatch next() { - return processRowBatch(childIter.next()); - } - }; - } + public abstract Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException; /** * Process the batch of rows as per the step logic. @@ -131,20 +106,12 @@ public abstract class AbstractDataLoadProcessorStep { protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch) { CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize()); while (rowBatch.hasNext()) { - newBatch.addRow(processRow(rowBatch.next())); + newBatch.addRow(null); } return newBatch; } /** - * Process the row as per the step logic. - * - * @param row - * @return processed row. - */ - protected abstract CarbonRow processRow(CarbonRow row); - - /** * Get the step name for logging purpose. * @return Step name */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java index 50ebc34..1e53817 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java @@ -17,8 +17,6 @@ package org.apache.carbondata.processing.loading.events; -import java.util.Map; - import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.events.Event; @@ -32,24 +30,11 @@ public class LoadEvents { public static class LoadTablePreExecutionEvent extends Event { private CarbonTableIdentifier carbonTableIdentifier; private CarbonLoadModel carbonLoadModel; - private String factPath; - private boolean isDataFrameDefined; - private Map<String, String> optionsFinal; - // userProvidedOptions are needed if we need only the load options given by user - private Map<String, String> userProvidedOptions; - private boolean isOverWriteTable; public LoadTablePreExecutionEvent(CarbonTableIdentifier carbonTableIdentifier, - CarbonLoadModel carbonLoadModel, String factPath, boolean isDataFrameDefined, - Map<String, String> optionsFinal, Map<String, String> userProvidedOptions, - boolean isOverWriteTable) { + CarbonLoadModel carbonLoadModel) { this.carbonTableIdentifier = carbonTableIdentifier; this.carbonLoadModel = carbonLoadModel; - this.factPath = factPath; - this.isDataFrameDefined = isDataFrameDefined; - this.optionsFinal = optionsFinal; - this.userProvidedOptions = userProvidedOptions; - this.isOverWriteTable = isOverWriteTable; } public CarbonTableIdentifier getCarbonTableIdentifier() { @@ -59,26 +44,6 @@ public class LoadEvents { public CarbonLoadModel getCarbonLoadModel() { return carbonLoadModel; } - - public String getFactPath() { - return factPath; - } - - public boolean isDataFrameDefined() { - return isDataFrameDefined; - } - - public Map<String, String> getOptionsFinal() { - return optionsFinal; - } - - public Map<String, String> getUserProvidedOptions() { - return userProvidedOptions; - } - - public boolean isOverWriteTable() { - return isOverWriteTable; - } } /** @@ -159,27 +124,4 @@ public class LoadEvents { return carbonLoadModel; } } - - /** - * Class for handling clean up in case of any failure and abort the operation. - */ - - public static class LoadTableAbortExecutionEvent extends Event { - private CarbonTableIdentifier carbonTableIdentifier; - private CarbonLoadModel carbonLoadModel; - public LoadTableAbortExecutionEvent(CarbonTableIdentifier carbonTableIdentifier, - CarbonLoadModel carbonLoadModel) { - this.carbonTableIdentifier = carbonTableIdentifier; - this.carbonLoadModel = carbonLoadModel; - } - - public CarbonTableIdentifier getCarbonTableIdentifier() { - return carbonTableIdentifier; - } - - public CarbonLoadModel getCarbonLoadModel() { - return carbonLoadModel; - } - } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/exception/BadRecordFoundException.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/exception/BadRecordFoundException.java b/processing/src/main/java/org/apache/carbondata/processing/loading/exception/BadRecordFoundException.java index 3c0fe53..b9a79ed 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/exception/BadRecordFoundException.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/exception/BadRecordFoundException.java @@ -47,14 +47,6 @@ public class BadRecordFoundException extends CarbonDataLoadingException { this.msg = msg; } - /** - * Constructor - * - * @param t - */ - public BadRecordFoundException(Throwable t) { - super(t); - } /** * getMessage http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/exception/NoRetryException.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/exception/NoRetryException.java b/processing/src/main/java/org/apache/carbondata/processing/loading/exception/NoRetryException.java index 3533adb..41256a6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/exception/NoRetryException.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/exception/NoRetryException.java @@ -40,25 +40,6 @@ public class NoRetryException extends RuntimeException { } /** - * Constructor - * - * @param msg The error message for this exception. - */ - public NoRetryException(String msg, Throwable t) { - super(msg, t); - this.msg = msg; - } - - /** - * Constructor - * - * @param t - */ - public NoRetryException(Throwable t) { - super(t); - } - - /** * getMessage */ public String getMessage() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java index 23179fa..7b3c3df 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortScopeOptions.java @@ -18,7 +18,6 @@ package org.apache.carbondata.processing.loading.sort; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.util.CarbonUtil; /** * Sort scope options @@ -43,12 +42,8 @@ public class SortScopeOptions { } } - public static boolean isValidSortOption(String sortScope) { - return CarbonUtil.isValidSortOption(sortScope); - } - public enum SortScope { - NO_SORT, BATCH_SORT, LOCAL_SORT, GLOBAL_SORT; + NO_SORT, BATCH_SORT, LOCAL_SORT, GLOBAL_SORT } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java index fcc88b5..74e1594 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java @@ -157,14 +157,6 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter { */ private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters) throws CarbonDataLoadingException { - if (null == sortDataRows) { - LOGGER.info("Record Processed For table: " + parameters.getTableName()); - LOGGER.info("Number of Records was Zero"); - String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0; - LOGGER.info(logMessage); - return false; - } - try { // start sorting sortDataRows.startSorting(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java index 808952b..5419e05 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java @@ -168,14 +168,6 @@ public class ParallelReadMergeSorterWithColumnRangeImpl extends AbstractMergeSor */ private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters) throws CarbonDataLoadingException { - if (null == sortDataRows || sortDataRows.length == 0) { - LOGGER.info("Record Processed For table: " + parameters.getTableName()); - LOGGER.info("Number of Records was Zero"); - String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0; - LOGGER.info(logMessage); - return false; - } - try { for (int i = 0; i < sortDataRows.length; i++) { // start sorting http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java index cb72f54..5cb099e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java @@ -309,14 +309,6 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter */ private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, SortParameters parameters) throws CarbonDataLoadingException { - if (null == sortDataRows) { - LOGGER.info("Record Processed For table: " + parameters.getTableName()); - LOGGER.info("Number of Records was Zero"); - String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0; - LOGGER.info(logMessage); - return false; - } - try { // start sorting sortDataRows.startSorting(); @@ -333,6 +325,5 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter throw new CarbonDataLoadingException(e); } } - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java index e9cc986..afa30c0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterImpl.java @@ -148,14 +148,6 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter { */ private boolean processRowToNextStep(UnsafeSortDataRows sortDataRows, SortParameters parameters) throws CarbonDataLoadingException { - if (null == sortDataRows) { - LOGGER.info("Record Processed For table: " + parameters.getTableName()); - LOGGER.info("Number of Records was Zero"); - String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0; - LOGGER.info(logMessage); - return false; - } - try { // start sorting sortDataRows.startSorting(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java index da90630..5766105 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java @@ -163,14 +163,6 @@ public class UnsafeParallelReadMergeSorterWithColumnRangeImpl extends AbstractMe */ private boolean processRowToNextStep(UnsafeSortDataRows[] sortDataRows, SortParameters parameters) throws CarbonDataLoadingException { - if (null == sortDataRows || sortDataRows.length == 0) { - LOGGER.info("Record Processed For table: " + parameters.getTableName()); - LOGGER.info("Number of Records was Zero"); - String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0; - LOGGER.info(logMessage); - return false; - } - try { for (int i = 0; i < sortDataRows.length; i++) { // start sorting http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java index e6e24ec..0b4eae2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java @@ -194,14 +194,6 @@ public class UnsafeIntermediateMerger { mergerTask.add(executorService.submit(merger)); } - private int getTotalNumberOfRows(List<UnsafeCarbonRowPage> unsafeCarbonRowPages) { - int totalSize = 0; - for (UnsafeCarbonRowPage unsafeCarbonRowPage : unsafeCarbonRowPages) { - totalSize += unsafeCarbonRowPage.getBuffer().getActualSize(); - } - return totalSize; - } - public void finish() throws CarbonSortKeyAndGroupByException { try { executorService.shutdown(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java index 1224674..1a05b12 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java @@ -39,7 +39,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datamap.DataMapWriterListener; import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; -import org.apache.carbondata.processing.loading.DataField; import org.apache.carbondata.processing.loading.exception.BadRecordFoundException; import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.loading.row.CarbonRowBatch; @@ -88,10 +87,6 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable()); } - @Override public DataField[] getOutput() { - return child.getOutput(); - } - @Override public void initialize() throws IOException { super.initialize(); child.initialize(); @@ -173,8 +168,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces while (iterator.hasNext()) { if (rowsNotExist) { rowsNotExist = false; - dataHandler = CarbonFactHandlerFactory - .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); + dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model); dataHandler.initialise(); } processBatch(iterator.next(), dataHandler, iteratorIndex); @@ -297,10 +291,6 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces rowCounter.getAndAdd(batch.getSize()); } - @Override protected CarbonRow processRow(CarbonRow row) { - return null; - } - class DataWriterRunnable implements Runnable { private Iterator<CarbonRowBatch> iterator; http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java index 43b2278..e3bc97f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java @@ -37,6 +37,7 @@ import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder; import org.apache.carbondata.processing.loading.converter.FieldConverter; import org.apache.carbondata.processing.loading.converter.RowConverter; import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; +import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.loading.partition.Partitioner; import org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl; import org.apache.carbondata.processing.loading.partition.impl.RangePartitionerImpl; @@ -64,11 +65,6 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte } @Override - public DataField[] getOutput() { - return child.getOutput(); - } - - @Override public void initialize() throws IOException { super.initialize(); child.initialize(); @@ -156,14 +152,22 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte } } + @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException { + Iterator<CarbonRowBatch>[] childIters = child.execute(); + Iterator<CarbonRowBatch>[] iterators = new Iterator[childIters.length]; + for (int i = 0; i < childIters.length; i++) { + iterators[i] = getIterator(childIters[i]); + } + return iterators; + } + /** * Create the iterator using child iterator. * * @param childIter * @return new iterator with step specific processing. */ - @Override - protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) { + private Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) { return new CarbonIterator<CarbonRowBatch>() { private boolean first = true; private RowConverter localConverter; @@ -209,11 +213,6 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte } @Override - protected CarbonRow processRow(CarbonRow row) { - throw new UnsupportedOperationException(); - } - - @Override public void close() { if (!closed) { if (null != badRecordLogger) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java index 9a11266..71a624e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java @@ -31,7 +31,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datamap.DataMapWriterListener; import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; -import org.apache.carbondata.processing.loading.DataField; import org.apache.carbondata.processing.loading.exception.BadRecordFoundException; import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.loading.row.CarbonRowBatch; @@ -58,10 +57,6 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable()); } - @Override public DataField[] getOutput() { - return child.getOutput(); - } - @Override public void initialize() throws IOException { super.initialize(); child.initialize(); @@ -94,10 +89,10 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS if (next.hasNext()) { DataMapWriterListener listener = getDataMapWriterListener(0); CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel - .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, k++, listener); + .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++, listener); model.setColumnLocalDictGenMap(this.localDictionaryGeneratorMap); CarbonFactHandler dataHandler = CarbonFactHandlerFactory - .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); + .createCarbonFactHandler(model); dataHandler.initialise(); processBatch(next, dataHandler); finish(tableName, dataHandler); @@ -157,9 +152,4 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS batch.close(); rowCounter.getAndAdd(batchSize); } - - @Override protected CarbonRow processRow(CarbonRow row) { - return null; - } - }