Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r245224484 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { + // initialize and prepare row counter + val sc = sparkSession.sparkContext + val modelBroadcast = sc.broadcast(model) + val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") + val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") + val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") + val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") + val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + + // 1. Input + hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) + val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, hadoopConf) + .mapPartitionsWithIndex { case (index, rows) => + DataLoadProcessorStepOnSpark + .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + + // 2. Convert + val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) + val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) + DataLoadProcessorStepOnSpark + .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) + } + .filter(_ != null) + + // 3. Range partition by range_column + val configuration = DataLoadProcessBuilder.createConfiguration(model) + val rangeColumnIndex = + indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields) + // convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)] + val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex)) + // range partition by key + val numPartitions = getNumPartitions(configuration, model, convertRDD) + val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) + import scala.reflect.classTag + val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast) + val rangeRDD = keyRDD + .partitionBy( + new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object])) + .map(_._2) + + // 4. Sort and Write data + sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => + DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast, + writeStepRowCounter, conf.value.value)) + + // Log the number of rows in each step + LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value) + LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value) + LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value) + LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value) + + // Update status + if (partialSuccessAccum.value != 0) { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + + "Partial_Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + executionErrors.failureCauses = FailureCauses.BAD_RECORDS + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) + } else { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) + } + } + + /** + * provide RDD for sample + * CSVRecordReader(univocity parser) will output only one column + */ + private def getSampleRDD( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration, + configuration: CarbonDataLoadConfiguration, + modelBroadcast: Broadcast[CarbonLoadModel] + ): RDD[(Object, Object)] = { + // initialize and prepare row counter + val configuration = DataLoadProcessBuilder.createConfiguration(model) + val header = configuration.getHeader + val rangeColumn = model.getRangePartitionColumn + val rangeColumnIndex = (0 until header.length).find{ + index => + header(index).equalsIgnoreCase(rangeColumn.getColName) + }.get + val rangeField = configuration + .getDataFields + .find(dataField => dataField.getColumn.getColName.equals(rangeColumn.getColName)) + .get + + // 1. Input + val newHadoopConf = new Configuration(hadoopConf) + newHadoopConf + .set(CSVInputFormat.SELECT_COLUMN_INDEX, "" + rangeColumnIndex) + val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, newHadoopConf) + .mapPartitionsWithIndex { case (index, rows) => + DataLoadProcessorStepOnSpark + .internalSampleInputFunc(rows, rangeField, index, modelBroadcast) + } + + // 2. Convert + val conf = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, hadoopConf) + val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) + DataLoadProcessorStepOnSpark + .sampleConvertFunc(rows, rangeField, index, modelBroadcast) + } + .filter(_ != null) + + convertRDD.map(row => (row.getObject(0), null)) + } + + /** + * calculate the number of partitions. + */ + private def getNumPartitions( + configuration: CarbonDataLoadConfiguration, + model: CarbonLoadModel, + convertRDD: RDD[CarbonRow] + ): Int = { + var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions( + configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS)) + if (numPartitions <= 0) { + if (model.getTotalSize <= 0) { + numPartitions = convertRDD.partitions.length + } else { + // calculate the number of partitions + // better to generate a CarbonData file for each partition + val totalSize = model.getTotalSize.toDouble + val table = model.getCarbonDataLoadSchema.getCarbonTable + val blockSize = 1024L * 1024 * table.getBlockSizeInMB + val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB + val scaleFactor = if (model.getScaleFactor == 0) { + // here it assumes the compression ratio of CarbonData is about 30%, + // so it multiply by 3 to get the split size of CSV files. + 3 + } else { + model.getScaleFactor + } + val splitSize = Math.max(blockletSize, (blockSize - blockletSize)) * scaleFactor --- End diff -- ok
---