Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157339682
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
---
@@ -345,23 +380,172 @@ case class CarbonLoadDataCommand(
} else {
(dataFrame, dataFrame)
}
- if
(!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
+ val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ if (!table.isChildDataMap) {
GlobalDictionaryUtil.generateGlobalDictionary(
sparkSession.sqlContext,
carbonLoadModel,
hadoopConf,
dictionaryDataFrame)
}
- CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
- carbonLoadModel,
- columnar,
- partitionStatus,
- None,
- isOverwriteTable,
- hadoopConf,
- loadDataFrame,
- updateModel,
- operationContext)
+ if (table.isStandardPartitionTable) {
+ loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf,
loadDataFrame)
+ } else {
+ CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+ carbonLoadModel,
+ columnar,
+ partitionStatus,
+ None,
+ isOverwriteTable,
+ hadoopConf,
+ loadDataFrame,
+ updateModel,
+ operationContext)
+ }
+ }
+
+ private def loadStandardPartition(sparkSession: SparkSession,
+ carbonLoadModel: CarbonLoadModel,
+ hadoopConf: Configuration,
+ dataFrame: Option[DataFrame]) = {
+ val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val logicalPlan =
+ sparkSession.sessionState.catalog.lookupRelation(
+ TableIdentifier(table.getTableName, Some(table.getDatabaseName)))
+ val relation = logicalPlan.collect {
+ case l: LogicalRelation => l
+ }.head
+
+
+ val query: LogicalPlan = if (dataFrame.isDefined) {
+ var timeStampformatString =
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
+ val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+ var dateFormatString =
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
+ val dateFormat = new SimpleDateFormat(dateFormatString)
+ val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+ val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+ val serializationNullFormat =
+
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA,
2)(1)
+ val attributes =
+ StructType(dataFrame.get.schema.fields.map(_.copy(dataType =
StringType))).toAttributes
+ val len = attributes.length
+ val rdd = dataFrame.get.rdd.map { f =>
+ val data = new Array[Any](len)
+ var i = 0
+ while (i < len) {
+ data(i) =
+ UTF8String.fromString(
+ CarbonScalaUtil.getString(f.get(i),
+ serializationNullFormat,
+ delimiterLevel1,
+ delimiterLevel2,
+ timeStampFormat,
+ dateFormat))
+ i = i + 1
+ }
+ InternalRow.fromSeq(data)
+ }
+ LogicalRDD(attributes, rdd)(sparkSession)
+
+ } else {
+ var timeStampformatString = carbonLoadModel.getTimestampformat
+ if (timeStampformatString.isEmpty) {
+ timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
+ }
+ val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+ var dateFormatString = carbonLoadModel.getDateFormat
+ if (dateFormatString.isEmpty) {
+ dateFormatString = carbonLoadModel.getDefaultDateFormat
+ }
+ val dateFormat = new SimpleDateFormat(dateFormatString)
+ // input data from csv files. Convert to logical plan
+ CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
+ hadoopConf.set(FileInputFormat.INPUT_DIR,
carbonLoadModel.getFactFilePath)
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val attributes =
+ StructType(carbonLoadModel.getCsvHeaderColumns.map(
+ StructField(_, StringType))).toAttributes
+ val rowDataTypes = attributes.map{f =>
+ relation.output.find(_.name.equalsIgnoreCase(f.name)) match {
+ case Some(attr) => attr.dataType
+ case _ => StringType
+ }
+ }
+ val len = rowDataTypes.length
+ val rdd =
+ new NewHadoopRDD[NullWritable, StringArrayWritable](
+ sparkSession.sparkContext,
+ classOf[CSVInputFormat],
+ classOf[NullWritable],
+ classOf[StringArrayWritable],
+ jobConf).map{f =>
+ val data = new Array[Any](len)
+ var i = 0
+ while (i < len) {
+ // TODO find a way to avoid double conversion of date and
time.
+ data(i) = CarbonScalaUtil.getString(
+ f._2.get()(i),
+ rowDataTypes(i),
+ timeStampFormat,
+ dateFormat)
+ i = i + 1
+ }
+ InternalRow.fromSeq(data)
+ }
+
+ // Only select the required columns
+ Project(relation.output.map(f =>
attributes.find(_.name.equalsIgnoreCase(f.name)).get),
+ LogicalRDD(attributes, rdd)(sparkSession))
+ }
+ Dataset.ofRows(sparkSession, InsertIntoTable(
--- End diff --
move parameter to separate line
---