Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1729#discussion_r159204374
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
---
@@ -484,161 +485,147 @@ case class CarbonLoadDataCommand(
// converted to hive standard fomat to let spark understand the data
to partition.
val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA,
2)(1)
- val failAction =
- carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase(
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
- val ignoreAction =
-
carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore")
- val query: LogicalPlan = if (dataFrame.isDefined) {
- var timeStampformatString =
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
- val timeStampFormat = new SimpleDateFormat(timeStampformatString)
- var dateFormatString =
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
- val dateFormat = new SimpleDateFormat(dateFormatString)
- val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
- val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
- val serializationNullFormat =
-
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA,
2)(1)
- val attributes =
- StructType(dataFrame.get.schema.fields.map(_.copy(dataType =
StringType))).toAttributes
- val len = attributes.length
- val rdd = dataFrame.get.rdd.map { f =>
- val data = new Array[Any](len)
- var i = 0
- while (i < len) {
- data(i) =
- UTF8String.fromString(
- CarbonScalaUtil.getString(f.get(i),
- serializationNullFormat,
- delimiterLevel1,
- delimiterLevel2,
- timeStampFormat,
- dateFormat))
- i = i + 1
+ val badRecordAction =
+ carbonLoadModel.getBadRecordsAction.split(",")(1)
+ var timeStampformatString = carbonLoadModel.getTimestampformat
+ if (timeStampformatString.isEmpty) {
+ timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
+ }
+ val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+ var dateFormatString = carbonLoadModel.getDateFormat
+ if (dateFormatString.isEmpty) {
+ dateFormatString = carbonLoadModel.getDefaultDateFormat
+ }
+ val dateFormat = new SimpleDateFormat(dateFormatString)
+
CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
dateFormatString)
+ CarbonSession.threadSet(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
+ timeStampformatString)
+ CarbonSession.threadSet(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT,
+ serializationNullFormat)
+ CarbonSession.threadSet(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+ badRecordAction)
+ CarbonSession.threadSet(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+ carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1))
+ try {
+ val query: LogicalPlan = if (dataFrame.isDefined) {
+ val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+ val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+ val attributes =
+ StructType(dataFrame.get.schema.fields.map(_.copy(dataType =
StringType))).toAttributes
+ val len = attributes.length
+ val rdd = dataFrame.get.rdd.map { f =>
+ val data = new Array[Any](len)
+ var i = 0
+ while (i < len) {
+ data(i) =
+ UTF8String.fromString(
+ CarbonScalaUtil.getString(f.get(i),
+ serializationNullFormat,
+ delimiterLevel1,
+ delimiterLevel2,
+ timeStampFormat,
+ dateFormat))
+ i = i + 1
+ }
+ InternalRow.fromSeq(data)
}
- InternalRow.fromSeq(data)
- }
- if (updateModel.isDefined) {
- sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
- // In case of update, we don't need the segmrntid column in case
of partitioning
- val dropAttributes = attributes.dropRight(1)
- val finalOutput = catalogTable.schema.map { attr =>
- dropAttributes.find { d =>
- val index = d.name.lastIndexOf("-updatedColumn")
- if (index > 0) {
- d.name.substring(0, index).equalsIgnoreCase(attr.name)
- } else {
- d.name.equalsIgnoreCase(attr.name)
- }
- }.get
+ if (updateModel.isDefined) {
+ sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY,
null)
+ // In case of update, we don't need the segmrntid column in case
of partitioning
+ val dropAttributes = attributes.dropRight(1)
+ val finalOutput = catalogTable.schema.map { attr =>
+ dropAttributes.find { d =>
+ val index = d.name.lastIndexOf("-updatedColumn")
+ if (index > 0) {
+ d.name.substring(0, index).equalsIgnoreCase(attr.name)
+ } else {
+ d.name.equalsIgnoreCase(attr.name)
+ }
+ }.get
+ }
+ Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession))
+ } else {
+ LogicalRDD(attributes, rdd)(sparkSession)
}
- Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession))
- } else {
- LogicalRDD(attributes, rdd)(sparkSession)
- }
- } else {
- var timeStampformatString = carbonLoadModel.getTimestampformat
- if (timeStampformatString.isEmpty) {
- timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
- }
- val timeStampFormat = new SimpleDateFormat(timeStampformatString)
- var dateFormatString = carbonLoadModel.getDateFormat
- if (dateFormatString.isEmpty) {
- dateFormatString = carbonLoadModel.getDefaultDateFormat
- }
- val dateFormat = new SimpleDateFormat(dateFormatString)
- // input data from csv files. Convert to logical plan
- CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
- hadoopConf.set(FileInputFormat.INPUT_DIR,
carbonLoadModel.getFactFilePath)
- val jobConf = new JobConf(hadoopConf)
- SparkHadoopUtil.get.addCredentials(jobConf)
- val attributes =
- StructType(carbonLoadModel.getCsvHeaderColumns.map(
- StructField(_, StringType))).toAttributes
- val rowDataTypes = attributes.map { attribute =>
- catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name))
match {
- case Some(attr) => attr.dataType
- case _ => StringType
+ } else {
+ // input data from csv files. Convert to logical plan
+ CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
+ hadoopConf.set(FileInputFormat.INPUT_DIR,
carbonLoadModel.getFactFilePath)
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val attributes =
+ StructType(carbonLoadModel.getCsvHeaderColumns.map(
+ StructField(_, StringType))).toAttributes
+ val rowDataTypes = attributes.map { attribute =>
+
catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match {
+ case Some(attr) => attr.dataType
+ case _ => StringType
+ }
}
- }
- val len = rowDataTypes.length
- // Fail row conversion if fail/ignore badrecord action is enabled
- val fail = failAction || ignoreAction
- var rdd =
- new NewHadoopRDD[NullWritable, StringArrayWritable](
- sparkSession.sparkContext,
- classOf[CSVInputFormat],
- classOf[NullWritable],
- classOf[StringArrayWritable],
- jobConf).map{ case (key, value) =>
+ val len = rowDataTypes.length
+ var rdd =
+ new NewHadoopRDD[NullWritable, StringArrayWritable](
+ sparkSession.sparkContext,
+ classOf[CSVInputFormat],
+ classOf[NullWritable],
+ classOf[StringArrayWritable],
+ jobConf).map { case (key, value) =>
val data = new Array[Any](len)
var i = 0
val input = value.get()
val inputLen = Math.min(input.length, len)
- try {
- while (i < inputLen) {
- // TODO find a way to avoid double conversion of date and
time.
- data(i) = CarbonScalaUtil.convertToUTF8String(
- input(i),
- rowDataTypes(i),
- timeStampFormat,
- dateFormat,
- serializationNullFormat,
- fail)
- i = i + 1
- }
- InternalRow.fromSeq(data)
- } catch {
- case e: Exception =>
- if (failAction) {
- // It is badrecord fail case.
- throw new BadRecordFoundException(
- s"Data load failed due to bad record: " +
- s"${input(i)} with datatype ${rowDataTypes(i)}")
- } else {
- // It is bad record ignore case
- InternalRow.empty
- }
+ while (i < inputLen) {
+ // TODO find a way to avoid double conversion of date and
time.
+ data(i) = UTF8String.fromString(input(i))
+ i = i + 1
}
+ InternalRow.fromSeq(data)
+ }
+ // Only select the required columns
+ val output = if (partition.nonEmpty) {
+ catalogTable.schema.map { attr =>
+ attributes.find(_.name.equalsIgnoreCase(attr.name)).get
+ }.filter(attr => partition.get(attr.name).isEmpty)
+ } else {
+ catalogTable.schema.map(f =>
attributes.find(_.name.equalsIgnoreCase(f.name)).get)
}
- // In bad record ignore case filter the empty values
- if (ignoreAction) {
- rdd = rdd.filter(f => f.numFields != 0)
+ Project(output, LogicalRDD(attributes, rdd)(sparkSession))
}
-
- // Only select the required columns
- val output = if (partition.nonEmpty) {
- catalogTable.schema.map{ attr =>
- attributes.find(_.name.equalsIgnoreCase(attr.name)).get
- }.filter(attr => partition.get(attr.name).isEmpty)
+ // TODO need to find a way to avoid double lookup
+ val sizeInBytes =
+ CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
+
catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes
+ val catalog = new CatalogFileIndex(sparkSession, catalogTable,
sizeInBytes)
+ val convertRelation = convertToLogicalRelation(
+ catalogTable,
+ sizeInBytes,
+ isOverwriteTable,
+ carbonLoadModel,
+ sparkSession)
+ val convertedPlan =
+ CarbonReflectionUtils.getInsertIntoCommand(
+ convertRelation,
+ partition,
+ query,
+ false,
--- End diff --
ok
---