Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r238505807
--- Diff:
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
---
@@ -156,4 +158,132 @@ object DataLoadProcessBuilderOnSpark {
Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
}
}
+
+ /**
+ * 1. range partition the whole input data
+ * 2. for each range, sort the data and writ it to CarbonData files
+ */
+ def loadDataUsingRangeSort(
+ sparkSession: SparkSession,
+ dataFrame: Option[DataFrame],
+ model: CarbonLoadModel,
+ hadoopConf: Configuration): Array[(String, (LoadMetadataDetails,
ExecutionErrors))] = {
+ val originRDD = if (dataFrame.isDefined) {
+ dataFrame.get.rdd
+ } else {
+ // input data from files
+ val columnCount = model.getCsvHeaderColumns.length
+ CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf)
+ .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
+ }
+ val sc = sparkSession.sparkContext
+ val modelBroadcast = sc.broadcast(model)
+ val partialSuccessAccum = sc.accumulator(0, "Partial Success
Accumulator")
+ val inputStepRowCounter = sc.accumulator(0, "Input Processor
Accumulator")
+ val convertStepRowCounter = sc.accumulator(0, "Convert Processor
Accumulator")
+ val sortStepRowCounter = sc.accumulator(0, "Sort Processor
Accumulator")
+ val writeStepRowCounter = sc.accumulator(0, "Write Processor
Accumulator")
+ hadoopConf
+ .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
sparkSession.sparkContext.appName)
+ val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
+ // 1. Input
+ val inputRDD = originRDD
+ .mapPartitions(rows =>
DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
+ .mapPartitionsWithIndex { case (index, rows) =>
+ DataLoadProcessorStepOnSpark.inputFunc(rows, index,
modelBroadcast, inputStepRowCounter)
+ }
+ // 2. Convert
+ val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows)
=>
+
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+ DataLoadProcessorStepOnSpark.convertFunc(rows, index,
modelBroadcast, partialSuccessAccum,
+ convertStepRowCounter)
+ }.filter(_ != null)
+ // 3. Range partition
+ val configuration = DataLoadProcessBuilder.createConfiguration(model)
+ val objectOrdering: Ordering[Object] =
createOrderingForColumn(model.getRangePartitionColumn)
+ var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions(
+
configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS))
+ if (numPartitions <= 0) {
+ if (model.getTotalSize <= 0) {
+ numPartitions = convertRDD.partitions.length
+ } else {
+ // calculate the number of partitions
+ // better to generate a CarbonData file for each partition
+ val totalSize = model.getTotalSize.toDouble
+ val table = model.getCarbonDataLoadSchema.getCarbonTable
+ val blockSize = 1024L * 1024 * table.getBlockSizeInMB
+ val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB
+ // here it assumes the compression ratio of CarbonData is about
33%,
+ // so it multiply by 3 to get the split size of CSV files.
+ val splitSize = Math.max(blockletSize, (blockSize - blockletSize))
* 3
+ numPartitions = Math.ceil(totalSize / splitSize).toInt
--- End diff --
yes, insert will use global sort
---