Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1559#discussion_r156873416
--- Diff:
integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
---
@@ -348,36 +347,53 @@ object GlobalDictionaryUtil {
}
/**
- * load CSV files to DataFrame by using datasource
"com.databricks.spark.csv"
+ * load and prune dictionary Rdd from csv file or input dataframe
*
- * @param sqlContext SQLContext
- * @param carbonLoadModel carbon data load model
+ * @param sqlContext sqlContext
+ * @param carbonLoadModel carbonLoadModel
+ * @param inputDF input dataframe
+ * @param requiredCols names of dictionary column
+ * @param hadoopConf hadoop configuration
+ * @return rdd that contains only dictionary columns
*/
- def loadDataFrame(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- hadoopConf: Configuration): DataFrame = {
- CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
- hadoopConf.set(FileInputFormat.INPUT_DIR,
carbonLoadModel.getFactFilePath)
- val columnNames = carbonLoadModel.getCsvHeaderColumns
- val schema = StructType(columnNames.map[StructField,
Array[StructField]] { column =>
- StructField(column, StringType)
- })
- val values = new Array[String](columnNames.length)
- val row = new StringArrayRow(values)
- val jobConf = new JobConf(hadoopConf)
- SparkHadoopUtil.get.addCredentials(jobConf)
- TokenCache.obtainTokensForNamenodes(jobConf.getCredentials,
- Array[Path](new Path(carbonLoadModel.getFactFilePath)),
- jobConf)
- val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
- sqlContext.sparkContext,
- classOf[CSVInputFormat],
- classOf[NullWritable],
- classOf[StringArrayWritable],
- jobConf).setName("global dictionary").map[Row] { currentRow =>
- row.setValues(currentRow._2.get())
+ private def loadInputDataAsDictRdd(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
+ inputDF: Option[DataFrame], requiredCols: Array[String],
+ hadoopConf: Configuration): RDD[Row] = {
+ if (inputDF.isDefined) {
+ inputDF.get.select(requiredCols.head, requiredCols.tail : _*).rdd
+ } else {
+ CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
+ hadoopConf.set(FileInputFormat.INPUT_DIR,
carbonLoadModel.getFactFilePath)
+ val headerCols =
carbonLoadModel.getCsvHeaderColumns.map(_.toLowerCase)
+ val header2Idx = headerCols.zipWithIndex.toMap
+ // index of dictionary columns in header
+ val dictColIdx = requiredCols.map(c => header2Idx(c.toLowerCase))
+
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ TokenCache.obtainTokensForNamenodes(jobConf.getCredentials,
+ Array[Path](new Path(carbonLoadModel.getFactFilePath)),
+ jobConf)
+ val dictRdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
+ sqlContext.sparkContext,
+ classOf[CSVInputFormat],
+ classOf[NullWritable],
+ classOf[StringArrayWritable],
+ jobConf).setName("global dictionary").map[Row] { currentRow =>
--- End diff --
move setName and map to separate line
---