Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1972#discussion_r168700095
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 ---
    @@ -676,6 +734,48 @@ case class CarbonLoadDataCommand(
         }
       }
     
    +  private def convertData(
    +      originRDD: RDD[Row],
    +      sparkSession: SparkSession,
    +      model: CarbonLoadModel,
    +      isDataFrame: Boolean): RDD[InternalRow] = {
    +    model.setPartitionId("0")
    +    val sc = sparkSession.sparkContext
    +    val modelBroadcast = sc.broadcast(model)
    +    val partialSuccessAccum = sc.accumulator(0, "Partial Success 
Accumulator")
    +
    +    val inputStepRowCounter = sc.accumulator(0, "Input Processor 
Accumulator")
    +    // 1. Input
    +    var convertRDD =
    +      if (isDataFrame) {
    +        originRDD.mapPartitions{rows =>
    +          DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast)
    +        }
    +      } else {
    +        originRDD.map{row =>
    +          val array = new Array[AnyRef](row.length)
    +          var i = 0
    +          while (i < array.length) {
    +            array(i) = row.get(i).asInstanceOf[AnyRef]
    +            i = i + 1
    +          }
    +          array
    +        }
    +      }
    +    val finalRDD = convertRDD.mapPartitionsWithIndex { case (index, rows) 
=>
    +        DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
    --- End diff --
    
    setDataTypeConverter is a static method. I think in concurrent scenarios if 
this is getting called from multiple places..this value can be get overriden


---

Reply via email to