Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1837#discussion_r163751453
--- Diff:
integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
---
@@ -57,30 +58,16 @@ class AggregateDataMapCompactor(carbonLoadModel:
CarbonLoadModel,
CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
carbonLoadModel.getDatabaseName + "." +
carbonLoadModel.getTableName, "false")
- val headers =
carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
- .map(_.getColumnName).mkString(",")
- // Creating a new query string to insert data into pre-aggregate
table from that same table.
- // For example: To compact preaggtable1 we can fire a query like
insert into preaggtable1
- // select * from preaggtable1
- // The following code will generate the select query with a load UDF
that will be used to
- // apply DataLoadingRules
- val childDataFrame = sqlContext.sparkSession.sql(new
CarbonSpark2SqlParser()
- // adding the aggregation load UDF
- .addPreAggLoadFunction(
- // creating the select query on the bases on table schema
- PreAggregateUtil.createChildSelectQuery(
- carbonTable.getTableInfo.getFactTable,
carbonTable.getDatabaseName))).drop("preAggLoad")
+
CarbonSession.updateSessionInfoToCurrentThread(sqlContext.sparkSession)
+ val loadCommand =
operationContext.getProperty(carbonTable.getTableName + "_Compaction")
+ .asInstanceOf[CarbonLoadDataCommand]
try {
- CarbonLoadDataCommand(
- Some(carbonTable.getDatabaseName),
- carbonTable.getTableName,
- null,
- Nil,
- Map("fileheader" -> headers),
- isOverwriteTable = false,
- dataFrame = Some(childDataFrame),
- internalOptions =
Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true",
- "mergedSegmentName" ->
mergedLoadName)).run(sqlContext.sparkSession)
+ val newInternalOptions = loadCommand.internalOptions ++
+ Map("mergedSegmentName" -> mergedLoadName)
+ loadCommand.internalOptions = newInternalOptions
+ loadCommand.dataFrame = Some(PreAggregateUtil
--- End diff --
Please correct the format
```
loadCommand.dataFrame =
Some(PreAggregateUtil.getDataFrame(sqlContext.sparkSession,
loadCommand.logicalPlan.get))
```
---