Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1632#discussion_r156954293
  
    --- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ---
    @@ -121,17 +121,18 @@ object DataLoadProcessBuilderOnSpark {
             CarbonProperties.getInstance().getGlobalSortRddStorageLevel()))
         }
     
    +    val sortStepRowConverter: SortStepRowHandler = new 
SortStepRowHandler(sortParameters)
         import scala.reflect.classTag
    +
    +    // 3. sort
         val sortRDD = convertRDD
    -      .sortBy(_.getData, numPartitions = numPartitions)(RowOrdering, 
classTag[Array[AnyRef]])
    -      .mapPartitionsWithIndex { case (index, rows) =>
    -        DataLoadProcessorStepOnSpark.convertTo3Parts(rows, index, 
modelBroadcast,
    -          sortStepRowCounter)
    -      }
    +      .map(r => DataLoadProcessorStepOnSpark.convertTo3Parts(r, 
TaskContext.getPartitionId(),
    +        modelBroadcast, sortStepRowConverter, sortStepRowCounter))
    +      .sortBy(r => r.getData, numPartitions = numPartitions)(RowOrdering, 
classTag[Array[AnyRef]])
    --- End diff --
    
    @xuchuanyin ...
    This PR is for compressing sort temp files but this code modification is 
for data load using global sort flow which does not involve creation of sort 
temp files.  Can you please clarify?


---

Reply via email to