Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r244680876
--- Diff:
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
---
@@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark {
Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
}
}
+
+ /**
+ * 1. range partition the whole input data
+ * 2. for each range, sort the data and writ it to CarbonData files
+ */
+ def loadDataUsingRangeSort(
+ sparkSession: SparkSession,
+ model: CarbonLoadModel,
+ hadoopConf: Configuration): Array[(String, (LoadMetadataDetails,
ExecutionErrors))] = {
+ // initialize and prepare row counter
+ val sc = sparkSession.sparkContext
+ val modelBroadcast = sc.broadcast(model)
+ val partialSuccessAccum = sc.accumulator(0, "Partial Success
Accumulator")
+ val inputStepRowCounter = sc.accumulator(0, "Input Processor
Accumulator")
+ val convertStepRowCounter = sc.accumulator(0, "Convert Processor
Accumulator")
+ val sortStepRowCounter = sc.accumulator(0, "Sort Processor
Accumulator")
+ val writeStepRowCounter = sc.accumulator(0, "Write Processor
Accumulator")
+
+ // 1. Input
+ hadoopConf
+ .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
sparkSession.sparkContext.appName)
+ val inputRDD = CsvRDDHelper
+ .csvFileScanRDD(sparkSession, model, hadoopConf)
+ .mapPartitionsWithIndex { case (index, rows) =>
+ DataLoadProcessorStepOnSpark
+ .internalInputFunc(rows, index, modelBroadcast,
inputStepRowCounter)
+ }
+
+ // 2. Convert
+ val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
+ val convertRDD = inputRDD
+ .mapPartitionsWithIndex { case (index, rows) =>
+
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+ DataLoadProcessorStepOnSpark
+ .convertFunc(rows, index, modelBroadcast, partialSuccessAccum,
convertStepRowCounter)
+ }
+ .filter(_ != null)
+
+ // 3. Range partition by range_column
+ val configuration = DataLoadProcessBuilder.createConfiguration(model)
+ val rangeColumnIndex =
+ indexOfColumn(model.getRangePartitionColumn,
configuration.getDataFields)
+ // convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)]
+ val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex))
+ // range partition by key
+ val numPartitions = getNumPartitions(configuration, model, convertRDD)
+ val objectOrdering: Ordering[Object] =
createOrderingForColumn(model.getRangePartitionColumn)
+ import scala.reflect.classTag
+ val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf,
configuration, modelBroadcast)
+ val rangeRDD = keyRDD
+ .partitionBy(
+ new DataSkewRangePartitioner(numPartitions,
sampleRDD)(objectOrdering, classTag[Object]))
+ .map(_._2)
+
+ // 4. Sort and Write data
+ sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow])
=>
+ DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows,
context.partitionId, modelBroadcast,
+ writeStepRowCounter, conf.value.value))
+
+ // Log the number of rows in each step
+ LOGGER.info("Total rows processed in step Input Processor: " +
inputStepRowCounter.value)
+ LOGGER.info("Total rows processed in step Data Converter: " +
convertStepRowCounter.value)
+ LOGGER.info("Total rows processed in step Sort Processor: " +
sortStepRowCounter.value)
+ LOGGER.info("Total rows processed in step Data Writer: " +
writeStepRowCounter.value)
+
+ // Update status
+ if (partialSuccessAccum.value != 0) {
+ val uniqueLoadStatusId = model.getTableName +
CarbonCommonConstants.UNDERSCORE +
+ "Partial_Success"
+ val loadMetadataDetails = new LoadMetadataDetails()
+
loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
+ val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+ executionErrors.failureCauses = FailureCauses.BAD_RECORDS
+ Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+ } else {
+ val uniqueLoadStatusId = model.getTableName +
CarbonCommonConstants.UNDERSCORE + "Success"
+ val loadMetadataDetails = new LoadMetadataDetails()
+ loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
+ val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+ Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+ }
+ }
+
+ /**
+ * provide RDD for sample
+ * CSVRecordReader(univocity parser) will output only one column
+ */
+ private def getSampleRDD(
+ sparkSession: SparkSession,
+ model: CarbonLoadModel,
+ hadoopConf: Configuration,
+ configuration: CarbonDataLoadConfiguration,
+ modelBroadcast: Broadcast[CarbonLoadModel]
+ ): RDD[(Object, Object)] = {
+ // initialize and prepare row counter
+ val configuration = DataLoadProcessBuilder.createConfiguration(model)
+ val header = configuration.getHeader
+ val rangeColumn = model.getRangePartitionColumn
+ val rangeColumnIndex = (0 until header.length).find{
+ index =>
+ header(index).equalsIgnoreCase(rangeColumn.getColName)
+ }.get
+ val rangeField = configuration
+ .getDataFields
+ .find(dataField =>
dataField.getColumn.getColName.equals(rangeColumn.getColName))
+ .get
+
+ // 1. Input
+ val newHadoopConf = new Configuration(hadoopConf)
+ newHadoopConf
+ .set(CSVInputFormat.SELECT_COLUMN_INDEX, "" + rangeColumnIndex)
+ val inputRDD = CsvRDDHelper
+ .csvFileScanRDD(sparkSession, model, newHadoopConf)
+ .mapPartitionsWithIndex { case (index, rows) =>
+ DataLoadProcessorStepOnSpark
+ .internalSampleInputFunc(rows, rangeField, index, modelBroadcast)
+ }
+
+ // 2. Convert
+ val conf = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext,
hadoopConf)
--- End diff --
Why don't you use the already brodcasted conf which is broadcasted in
caller method.
---