Github user qiuchenjian commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r238292207
  
    --- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ---
    @@ -156,4 +158,132 @@ object DataLoadProcessBuilderOnSpark {
           Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
         }
       }
    +
    +  /**
    +   * 1. range partition the whole input data
    +   * 2. for each range, sort the data and writ it to CarbonData files
    +   */
    +  def loadDataUsingRangeSort(
    +      sparkSession: SparkSession,
    +      dataFrame: Option[DataFrame],
    +      model: CarbonLoadModel,
    +      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
    +    val originRDD = if (dataFrame.isDefined) {
    +      dataFrame.get.rdd
    +    } else {
    +      // input data from files
    +      val columnCount = model.getCsvHeaderColumns.length
    +      CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf)
    +        .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
    +    }
    +    val sc = sparkSession.sparkContext
    +    val modelBroadcast = sc.broadcast(model)
    +    val partialSuccessAccum = sc.accumulator(0, "Partial Success 
Accumulator")
    +    val inputStepRowCounter = sc.accumulator(0, "Input Processor 
Accumulator")
    +    val convertStepRowCounter = sc.accumulator(0, "Convert Processor 
Accumulator")
    +    val sortStepRowCounter = sc.accumulator(0, "Sort Processor 
Accumulator")
    +    val writeStepRowCounter = sc.accumulator(0, "Write Processor 
Accumulator")
    +    hadoopConf
    +      .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, 
sparkSession.sparkContext.appName)
    +    val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
    +    // 1. Input
    +    val inputRDD = originRDD
    +      .mapPartitions(rows => 
DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
    +      .mapPartitionsWithIndex { case (index, rows) =>
    +        DataLoadProcessorStepOnSpark.inputFunc(rows, index, 
modelBroadcast, inputStepRowCounter)
    +      }
    +    // 2. Convert
    +    val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) 
=>
    +      
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
    +      DataLoadProcessorStepOnSpark.convertFunc(rows, index, 
modelBroadcast, partialSuccessAccum,
    +        convertStepRowCounter)
    +    }.filter(_ != null)
    +    // 3. Range partition
    +    val configuration = DataLoadProcessBuilder.createConfiguration(model)
    +    val objectOrdering: Ordering[Object] = 
createOrderingForColumn(model.getRangePartitionColumn)
    +    var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions(
    +      
configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS))
    +    if (numPartitions <= 0) {
    +      if (model.getTotalSize <= 0) {
    +        numPartitions = convertRDD.partitions.length
    +      } else {
    +        // calculate the number of partitions
    +        // better to generate a CarbonData file for each partition
    +        val totalSize = model.getTotalSize.toDouble
    +        val table = model.getCarbonDataLoadSchema.getCarbonTable
    +        val blockSize = 1024L * 1024 * table.getBlockSizeInMB
    +        val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB
    +        // here it assumes the compression ratio of CarbonData is about 
33%,
    +        // so it multiply by 3 to get the split size of CSV files.
    +        val splitSize = Math.max(blockletSize, (blockSize - blockletSize)) 
* 3
    +        numPartitions = Math.ceil(totalSize / splitSize).toInt
    --- End diff --
    
    If insert using dataframe, I think totalSize will be 0


---

Reply via email to