Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r245223174 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala --- @@ -95,6 +96,67 @@ object DataLoadProcessorStepOnSpark { } } + def internalInputFunc( + rows: Iterator[InternalRow], + index: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + rowCounter: Accumulator[Int]): Iterator[CarbonRow] = { + val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString) + val conf = DataLoadProcessBuilder.createConfiguration(model) + val rowParser = new RowParserImpl(conf.getDataFields, conf) + val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf) + TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + wrapException(e, model) + } + + new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { + var row : CarbonRow = null + val rawRow = + rows.next().asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[Object]] + if(isRawDataRequired) { + row = new CarbonRow(rowParser.parseRow(rawRow), rawRow) + } else { + row = new CarbonRow(rowParser.parseRow(rawRow)) + } + rowCounter.add(1) + row + } + } + } + + def internalSampleInputFunc( --- End diff -- ok
---