Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r168700095
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
---
@@ -676,6 +734,48 @@ case class CarbonLoadDataCommand(
}
}
+ private def convertData(
+ originRDD: RDD[Row],
+ sparkSession: SparkSession,
+ model: CarbonLoadModel,
+ isDataFrame: Boolean): RDD[InternalRow] = {
+ model.setPartitionId("0")
+ val sc = sparkSession.sparkContext
+ val modelBroadcast = sc.broadcast(model)
+ val partialSuccessAccum = sc.accumulator(0, "Partial Success
Accumulator")
+
+ val inputStepRowCounter = sc.accumulator(0, "Input Processor
Accumulator")
+ // 1. Input
+ var convertRDD =
+ if (isDataFrame) {
+ originRDD.mapPartitions{rows =>
+ DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast)
+ }
+ } else {
+ originRDD.map{row =>
+ val array = new Array[AnyRef](row.length)
+ var i = 0
+ while (i < array.length) {
+ array(i) = row.get(i).asInstanceOf[AnyRef]
+ i = i + 1
+ }
+ array
+ }
+ }
+ val finalRDD = convertRDD.mapPartitionsWithIndex { case (index, rows)
=>
+ DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
--- End diff --
setDataTypeConverter is a static method. I think in concurrent scenarios if
this is getting called from multiple places..this value can be get overriden
---