Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1632#discussion_r156954293
--- Diff:
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
---
@@ -121,17 +121,18 @@ object DataLoadProcessBuilderOnSpark {
CarbonProperties.getInstance().getGlobalSortRddStorageLevel()))
}
+ val sortStepRowConverter: SortStepRowHandler = new
SortStepRowHandler(sortParameters)
import scala.reflect.classTag
+
+ // 3. sort
val sortRDD = convertRDD
- .sortBy(_.getData, numPartitions = numPartitions)(RowOrdering,
classTag[Array[AnyRef]])
- .mapPartitionsWithIndex { case (index, rows) =>
- DataLoadProcessorStepOnSpark.convertTo3Parts(rows, index,
modelBroadcast,
- sortStepRowCounter)
- }
+ .map(r => DataLoadProcessorStepOnSpark.convertTo3Parts(r,
TaskContext.getPartitionId(),
+ modelBroadcast, sortStepRowConverter, sortStepRowCounter))
+ .sortBy(r => r.getData, numPartitions = numPartitions)(RowOrdering,
classTag[Array[AnyRef]])
--- End diff --
@xuchuanyin ...
This PR is for compressing sort temp files but this code modification is
for data load using global sort flow which does not involve creation of sort
temp files. Can you please clarify?
---