Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1632#discussion_r157112148
--- Diff:
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
---
@@ -121,17 +121,18 @@ object DataLoadProcessBuilderOnSpark {
CarbonProperties.getInstance().getGlobalSortRddStorageLevel()))
}
+ val sortStepRowConverter: SortStepRowHandler = new
SortStepRowHandler(sortParameters)
import scala.reflect.classTag
+
+ // 3. sort
val sortRDD = convertRDD
- .sortBy(_.getData, numPartitions = numPartitions)(RowOrdering,
classTag[Array[AnyRef]])
- .mapPartitionsWithIndex { case (index, rows) =>
- DataLoadProcessorStepOnSpark.convertTo3Parts(rows, index,
modelBroadcast,
- sortStepRowCounter)
- }
+ .map(r => DataLoadProcessorStepOnSpark.convertTo3Parts(r,
TaskContext.getPartitionId(),
+ modelBroadcast, sortStepRowConverter, sortStepRowCounter))
+ .sortBy(r => r.getData, numPartitions = numPartitions)(RowOrdering,
classTag[Array[AnyRef]])
--- End diff --
This change of code is not involved with sort temp file. I changed this
because the interface and internal load procedure has been changed.
After `convertRDD`, each row is still raw-row; In the sort phrase, rows
will be converted to 3-parts; In the write phrase, rows will be encoded and
written.
In the previous implementation, Carbondata sort on these raw-rows and then
convert each row to 3-parts in batch.
In the current implementation, Carbondata firstly convert each row to
3-parts, and sort on these rows.
While converting raw-row to 3-parts-row, the interface
(DataLoadProcessorStepOnSpark.convertTo3Parts) has changed: previously deal
with batch, currently deal with one row.
---