Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1972#discussion_r168705495
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
---
@@ -592,41 +666,12 @@ case class CarbonLoadDataCommand(
case _ => false
}
}
- val len = rowDataTypes.length
- var rdd =
- DataLoadingUtil.csvFileScanRDD(
- sparkSession,
- model = carbonLoadModel,
- hadoopConf)
- .map { row =>
- val data = new Array[Any](len)
- var i = 0
- val input =
row.asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[String]]
- val inputLen = Math.min(input.length, len)
- while (i < inputLen) {
- data(i) = UTF8String.fromString(input(i))
- // If partition column then update empty value with
special string otherwise spark
- // makes it as null so we cannot internally handle
badrecords.
- if (partitionColumns(i)) {
- if (input(i) != null && input(i).isEmpty) {
- data(i) =
UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
- }
- }
- i = i + 1
- }
- InternalRow.fromSeq(data)
-
- }
- // Only select the required columns
- val output = if (partition.nonEmpty) {
- val lowerCasePartition = partition.map{case(key, value) =>
(key.toLowerCase, value)}
- catalogTable.schema.map { attr =>
- attributes.find(_.name.equalsIgnoreCase(attr.name)).get
- }.filter(attr =>
lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty)
- } else {
- catalogTable.schema.map(f =>
attributes.find(_.name.equalsIgnoreCase(f.name)).get)
- }
- Project(output, LogicalRDD(attributes, rdd)(sparkSession))
+ val columnCount = carbonLoadModel.getCsvHeaderColumns.length
+ var rdd = DataLoadingUtil.csvFileScanRDD(
--- End diff --
this var can be val
---