Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1837#discussion_r163751453
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
 ---
    @@ -57,30 +58,16 @@ class AggregateDataMapCompactor(carbonLoadModel: 
CarbonLoadModel,
             CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
             carbonLoadModel.getDatabaseName + "." +
             carbonLoadModel.getTableName, "false")
    -      val headers = 
carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
    -        .map(_.getColumnName).mkString(",")
    -      // Creating a new query string to insert data into pre-aggregate 
table from that same table.
    -      // For example: To compact preaggtable1 we can fire a query like 
insert into preaggtable1
    -      // select * from preaggtable1
    -      // The following code will generate the select query with a load UDF 
that will be used to
    -      // apply DataLoadingRules
    -      val childDataFrame = sqlContext.sparkSession.sql(new 
CarbonSpark2SqlParser()
    -        // adding the aggregation load UDF
    -        .addPreAggLoadFunction(
    -        // creating the select query on the bases on table schema
    -        PreAggregateUtil.createChildSelectQuery(
    -          carbonTable.getTableInfo.getFactTable, 
carbonTable.getDatabaseName))).drop("preAggLoad")
    +      
CarbonSession.updateSessionInfoToCurrentThread(sqlContext.sparkSession)
    +      val loadCommand = 
operationContext.getProperty(carbonTable.getTableName + "_Compaction")
    +        .asInstanceOf[CarbonLoadDataCommand]
           try {
    -        CarbonLoadDataCommand(
    -          Some(carbonTable.getDatabaseName),
    -          carbonTable.getTableName,
    -          null,
    -          Nil,
    -          Map("fileheader" -> headers),
    -          isOverwriteTable = false,
    -          dataFrame = Some(childDataFrame),
    -          internalOptions = 
Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true",
    -            "mergedSegmentName" -> 
mergedLoadName)).run(sqlContext.sparkSession)
    +        val newInternalOptions = loadCommand.internalOptions ++
    +                                 Map("mergedSegmentName" -> mergedLoadName)
    +        loadCommand.internalOptions = newInternalOptions
    +        loadCommand.dataFrame = Some(PreAggregateUtil
    --- End diff --
    
    Please correct the format 
    ```
    loadCommand.dataFrame = 
              Some(PreAggregateUtil.getDataFrame(sqlContext.sparkSession, 
loadCommand.logicalPlan.get))
    ```


---

Reply via email to