Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2971#discussion_r244681062
--- Diff:
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
---
@@ -95,6 +96,67 @@ object DataLoadProcessorStepOnSpark {
}
}
+ def internalInputFunc(
+ rows: Iterator[InternalRow],
+ index: Int,
+ modelBroadcast: Broadcast[CarbonLoadModel],
+ rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
+ val model: CarbonLoadModel =
modelBroadcast.value.getCopyWithTaskNo(index.toString)
+ val conf = DataLoadProcessBuilder.createConfiguration(model)
+ val rowParser = new RowParserImpl(conf.getDataFields, conf)
+ val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf)
+ TaskContext.get().addTaskFailureListener { (t: TaskContext, e:
Throwable) =>
+ wrapException(e, model)
+ }
+
+ new Iterator[CarbonRow] {
+ override def hasNext: Boolean = rows.hasNext
+
+ override def next(): CarbonRow = {
+ var row : CarbonRow = null
+ val rawRow =
+
rows.next().asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[Object]]
+ if(isRawDataRequired) {
+ row = new CarbonRow(rowParser.parseRow(rawRow), rawRow)
+ } else {
+ row = new CarbonRow(rowParser.parseRow(rawRow))
+ }
+ rowCounter.add(1)
+ row
+ }
+ }
+ }
+
+ def internalSampleInputFunc(
--- End diff --
Please unify `internalSampleInputFunc` and `internalInputFunc`
---