Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r245224484
  
    --- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ---
    @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark {
           Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
         }
       }
    +
    +  /**
    +   * 1. range partition the whole input data
    +   * 2. for each range, sort the data and writ it to CarbonData files
    +   */
    +  def loadDataUsingRangeSort(
    +      sparkSession: SparkSession,
    +      model: CarbonLoadModel,
    +      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
    +    // initialize and prepare row counter
    +    val sc = sparkSession.sparkContext
    +    val modelBroadcast = sc.broadcast(model)
    +    val partialSuccessAccum = sc.accumulator(0, "Partial Success 
Accumulator")
    +    val inputStepRowCounter = sc.accumulator(0, "Input Processor 
Accumulator")
    +    val convertStepRowCounter = sc.accumulator(0, "Convert Processor 
Accumulator")
    +    val sortStepRowCounter = sc.accumulator(0, "Sort Processor 
Accumulator")
    +    val writeStepRowCounter = sc.accumulator(0, "Write Processor 
Accumulator")
    +
    +    // 1. Input
    +    hadoopConf
    +      .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, 
sparkSession.sparkContext.appName)
    +    val inputRDD = CsvRDDHelper
    +      .csvFileScanRDD(sparkSession, model, hadoopConf)
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        DataLoadProcessorStepOnSpark
    +          .internalInputFunc(rows, index, modelBroadcast, 
inputStepRowCounter)
    +      }
    +
    +    // 2. Convert
    +    val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
    +    val convertRDD = inputRDD
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
    +        DataLoadProcessorStepOnSpark
    +          .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, 
convertStepRowCounter)
    +      }
    +      .filter(_ != null)
    +
    +    // 3. Range partition by range_column
    +    val configuration = DataLoadProcessBuilder.createConfiguration(model)
    +    val rangeColumnIndex =
    +      indexOfColumn(model.getRangePartitionColumn, 
configuration.getDataFields)
    +    // convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)]
    +    val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex))
    +    // range partition by key
    +    val numPartitions = getNumPartitions(configuration, model, convertRDD)
    +    val objectOrdering: Ordering[Object] = 
createOrderingForColumn(model.getRangePartitionColumn)
    +    import scala.reflect.classTag
    +    val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, 
configuration, modelBroadcast)
    +    val rangeRDD = keyRDD
    +      .partitionBy(
    +        new DataSkewRangePartitioner(numPartitions, 
sampleRDD)(objectOrdering, classTag[Object]))
    +      .map(_._2)
    +
    +    // 4. Sort and Write data
    +    sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) 
=>
    +      DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, 
context.partitionId, modelBroadcast,
    +        writeStepRowCounter, conf.value.value))
    +
    +    // Log the number of rows in each step
    +    LOGGER.info("Total rows processed in step Input Processor: " + 
inputStepRowCounter.value)
    +    LOGGER.info("Total rows processed in step Data Converter: " + 
convertStepRowCounter.value)
    +    LOGGER.info("Total rows processed in step Sort Processor: " + 
sortStepRowCounter.value)
    +    LOGGER.info("Total rows processed in step Data Writer: " + 
writeStepRowCounter.value)
    +
    +    // Update status
    +    if (partialSuccessAccum.value != 0) {
    +      val uniqueLoadStatusId = model.getTableName + 
CarbonCommonConstants.UNDERSCORE +
    +                               "Partial_Success"
    +      val loadMetadataDetails = new LoadMetadataDetails()
    +      
loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
    +      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
    +      executionErrors.failureCauses = FailureCauses.BAD_RECORDS
    +      Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
    +    } else {
    +      val uniqueLoadStatusId = model.getTableName + 
CarbonCommonConstants.UNDERSCORE + "Success"
    +      val loadMetadataDetails = new LoadMetadataDetails()
    +      loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
    +      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
    +      Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
    +    }
    +  }
    +
    +  /**
    +   * provide RDD for sample
    +   * CSVRecordReader(univocity parser) will output only one column
    +   */
    +  private def getSampleRDD(
    +      sparkSession: SparkSession,
    +      model: CarbonLoadModel,
    +      hadoopConf: Configuration,
    +      configuration: CarbonDataLoadConfiguration,
    +      modelBroadcast: Broadcast[CarbonLoadModel]
    +  ): RDD[(Object, Object)] = {
    +    // initialize and prepare row counter
    +    val configuration = DataLoadProcessBuilder.createConfiguration(model)
    +    val header = configuration.getHeader
    +    val rangeColumn = model.getRangePartitionColumn
    +    val rangeColumnIndex = (0 until header.length).find{
    +      index =>
    +        header(index).equalsIgnoreCase(rangeColumn.getColName)
    +    }.get
    +    val rangeField = configuration
    +      .getDataFields
    +      .find(dataField => 
dataField.getColumn.getColName.equals(rangeColumn.getColName))
    +      .get
    +
    +    // 1. Input
    +    val newHadoopConf = new Configuration(hadoopConf)
    +    newHadoopConf
    +      .set(CSVInputFormat.SELECT_COLUMN_INDEX, "" + rangeColumnIndex)
    +    val inputRDD = CsvRDDHelper
    +      .csvFileScanRDD(sparkSession, model, newHadoopConf)
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        DataLoadProcessorStepOnSpark
    +          .internalSampleInputFunc(rows, rangeField, index, modelBroadcast)
    +      }
    +
    +    // 2. Convert
    +    val conf = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, 
hadoopConf)
    +    val convertRDD = inputRDD
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
    +        DataLoadProcessorStepOnSpark
    +          .sampleConvertFunc(rows, rangeField, index, modelBroadcast)
    +      }
    +      .filter(_ != null)
    +
    +    convertRDD.map(row => (row.getObject(0), null))
    +  }
    +
    +  /**
    +   * calculate the number of partitions.
    +   */
    +  private def getNumPartitions(
    +      configuration: CarbonDataLoadConfiguration,
    +      model: CarbonLoadModel,
    +      convertRDD: RDD[CarbonRow]
    +  ): Int = {
    +    var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions(
    +      
configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS))
    +    if (numPartitions <= 0) {
    +      if (model.getTotalSize <= 0) {
    +        numPartitions = convertRDD.partitions.length
    +      } else {
    +        // calculate the number of partitions
    +        // better to generate a CarbonData file for each partition
    +        val totalSize = model.getTotalSize.toDouble
    +        val table = model.getCarbonDataLoadSchema.getCarbonTable
    +        val blockSize = 1024L * 1024 * table.getBlockSizeInMB
    +        val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB
    +        val scaleFactor = if (model.getScaleFactor == 0) {
    +          // here it assumes the compression ratio of CarbonData is about 
30%,
    +          // so it multiply by 3 to get the split size of CSV files.
    +          3
    +        } else {
    +          model.getScaleFactor
    +        }
    +        val splitSize = Math.max(blockletSize, (blockSize - blockletSize)) 
* scaleFactor
    --- End diff --
    
    ok


---

Reply via email to