Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1672#discussion_r157346999
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
---
@@ -345,23 +380,172 @@ case class CarbonLoadDataCommand(
} else {
(dataFrame, dataFrame)
}
- if
(!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
+ val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ if (!table.isChildDataMap) {
GlobalDictionaryUtil.generateGlobalDictionary(
sparkSession.sqlContext,
carbonLoadModel,
hadoopConf,
dictionaryDataFrame)
}
- CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
- carbonLoadModel,
- columnar,
- partitionStatus,
- None,
- isOverwriteTable,
- hadoopConf,
- loadDataFrame,
- updateModel,
- operationContext)
+ if (table.isStandardPartitionTable) {
+ loadStandardPartition(sparkSession, carbonLoadModel, hadoopConf,
loadDataFrame)
+ } else {
+ CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+ carbonLoadModel,
+ columnar,
+ partitionStatus,
+ None,
+ isOverwriteTable,
+ hadoopConf,
+ loadDataFrame,
+ updateModel,
+ operationContext)
+ }
+ }
+
+ private def loadStandardPartition(sparkSession: SparkSession,
+ carbonLoadModel: CarbonLoadModel,
+ hadoopConf: Configuration,
+ dataFrame: Option[DataFrame]) = {
+ val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val logicalPlan =
+ sparkSession.sessionState.catalog.lookupRelation(
+ TableIdentifier(table.getTableName, Some(table.getDatabaseName)))
+ val relation = logicalPlan.collect {
+ case l: LogicalRelation => l
+ }.head
+
+
+ val query: LogicalPlan = if (dataFrame.isDefined) {
+ var timeStampformatString =
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
+ val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+ var dateFormatString =
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
+ val dateFormat = new SimpleDateFormat(dateFormatString)
+ val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+ val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+ val serializationNullFormat =
+
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA,
2)(1)
+ val attributes =
+ StructType(dataFrame.get.schema.fields.map(_.copy(dataType =
StringType))).toAttributes
+ val len = attributes.length
+ val rdd = dataFrame.get.rdd.map { f =>
+ val data = new Array[Any](len)
+ var i = 0
+ while (i < len) {
+ data(i) =
+ UTF8String.fromString(
+ CarbonScalaUtil.getString(f.get(i),
+ serializationNullFormat,
+ delimiterLevel1,
+ delimiterLevel2,
+ timeStampFormat,
+ dateFormat))
+ i = i + 1
+ }
+ InternalRow.fromSeq(data)
+ }
+ LogicalRDD(attributes, rdd)(sparkSession)
+
+ } else {
+ var timeStampformatString = carbonLoadModel.getTimestampformat
+ if (timeStampformatString.isEmpty) {
+ timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
+ }
+ val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+ var dateFormatString = carbonLoadModel.getDateFormat
+ if (dateFormatString.isEmpty) {
+ dateFormatString = carbonLoadModel.getDefaultDateFormat
+ }
+ val dateFormat = new SimpleDateFormat(dateFormatString)
+ // input data from csv files. Convert to logical plan
+ CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
+ hadoopConf.set(FileInputFormat.INPUT_DIR,
carbonLoadModel.getFactFilePath)
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val attributes =
+ StructType(carbonLoadModel.getCsvHeaderColumns.map(
+ StructField(_, StringType))).toAttributes
+ val rowDataTypes = attributes.map{f =>
+ relation.output.find(_.name.equalsIgnoreCase(f.name)) match {
+ case Some(attr) => attr.dataType
+ case _ => StringType
+ }
+ }
+ val len = rowDataTypes.length
+ val rdd =
+ new NewHadoopRDD[NullWritable, StringArrayWritable](
+ sparkSession.sparkContext,
+ classOf[CSVInputFormat],
+ classOf[NullWritable],
+ classOf[StringArrayWritable],
+ jobConf).map{f =>
+ val data = new Array[Any](len)
+ var i = 0
+ while (i < len) {
+ // TODO find a way to avoid double conversion of date and
time.
+ data(i) = CarbonScalaUtil.getString(
+ f._2.get()(i),
+ rowDataTypes(i),
+ timeStampFormat,
+ dateFormat)
+ i = i + 1
+ }
+ InternalRow.fromSeq(data)
+ }
+
+ // Only select the required columns
+ Project(relation.output.map(f =>
attributes.find(_.name.equalsIgnoreCase(f.name)).get),
+ LogicalRDD(attributes, rdd)(sparkSession))
+ }
+ Dataset.ofRows(sparkSession, InsertIntoTable(
+ convertToLogicalRelation(relation, isOverwriteTable,
carbonLoadModel, sparkSession),
+ partition,
+ query,
+ OverwriteOptions(isOverwriteTable), false))
+ }
+
+ private def convertToLogicalRelation(
+ relation: LogicalRelation,
+ overWrite: Boolean,
+ loadModel: CarbonLoadModel,
+ sparkSession: SparkSession): LogicalRelation = {
+ val catalogTable = relation.catalogTable.get
+ val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
+ val metastoreSchema = StructType(StructType.fromAttributes(
+ relation.output).fields.map(_.copy(dataType = StringType)))
+ val lazyPruningEnabled =
sparkSession.sqlContext.conf.manageFilesourcePartitions
+ val catalog = new CatalogFileIndex(
+ sparkSession, catalogTable, relation.relation.sizeInBytes)
+ if (lazyPruningEnabled) {
+ catalog
+ } else {
+ catalog.filterPartitions(Nil) // materialize all the partitions in
memory
+ }
+ val partitionSchema =
+
StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(f
=>
+
metastoreSchema.fields.find(_.name.equalsIgnoreCase(f.getColumnName))).map(_.get))
+
--- End diff --
ok
---