Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2971#discussion_r245223174
  
    --- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 ---
    @@ -95,6 +96,67 @@ object DataLoadProcessorStepOnSpark {
         }
       }
     
    +  def internalInputFunc(
    +      rows: Iterator[InternalRow],
    +      index: Int,
    +      modelBroadcast: Broadcast[CarbonLoadModel],
    +      rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
    +    val model: CarbonLoadModel = 
modelBroadcast.value.getCopyWithTaskNo(index.toString)
    +    val conf = DataLoadProcessBuilder.createConfiguration(model)
    +    val rowParser = new RowParserImpl(conf.getDataFields, conf)
    +    val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf)
    +    TaskContext.get().addTaskFailureListener { (t: TaskContext, e: 
Throwable) =>
    +      wrapException(e, model)
    +    }
    +
    +    new Iterator[CarbonRow] {
    +      override def hasNext: Boolean = rows.hasNext
    +
    +      override def next(): CarbonRow = {
    +        var row : CarbonRow = null
    +        val rawRow =
    +          
rows.next().asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[Object]]
    +        if(isRawDataRequired) {
    +          row = new CarbonRow(rowParser.parseRow(rawRow), rawRow)
    +        } else {
    +          row = new CarbonRow(rowParser.parseRow(rawRow))
    +        }
    +        rowCounter.add(1)
    +        row
    +      }
    +    }
    +  }
    +
    +  def internalSampleInputFunc(
    --- End diff --
    
    ok


---

Reply via email to