Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1837#discussion_r163758021
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
 ---
    @@ -20,18 +20,141 @@ package 
org.apache.spark.sql.execution.command.preaaggregate
     import scala.collection.JavaConverters._
     import scala.collection.mutable
     
    -import org.apache.spark.sql.CarbonEnv
    -import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.{SparkSession}
     import org.apache.spark.sql.catalyst.TableIdentifier
     import org.apache.spark.sql.execution.command.AlterTableModel
    -import 
org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
    +import 
org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand,
 CarbonLoadDataCommand}
    +import org.apache.spark.sql.parser.CarbonSpark2SqlParser
     
    -import org.apache.carbondata.core.constants.CarbonCommonConstants
     import 
org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
     import org.apache.carbondata.core.util.CarbonUtil
     import org.apache.carbondata.events._
    -import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent,
 LoadTablePreStatusUpdateEvent}
    +import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, 
LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object CompactionProcessMetaListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext): Unit = {
    +    val sparkSession = SparkSession.getActiveSession.get
    +    val tableEvent = event.asInstanceOf[LoadMetadataEvent]
    +    val table = tableEvent.getCarbonTable
    +    if (!table.isChildDataMap && CarbonUtil.hasAggregationDataMap(table)) {
    +      val aggregationDataMapList = 
table.getTableInfo.getDataMapSchemaList.asScala
    +        .filter(_.isInstanceOf[AggregationDataMapSchema])
    +        .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]]
    +      for (dataMapSchema: AggregationDataMapSchema <- 
aggregationDataMapList) {
    +        val childTableName = 
dataMapSchema.getRelationIdentifier.getTableName
    +        val childDatabaseName = 
dataMapSchema.getRelationIdentifier.getDatabaseName
    +        // Creating a new query string to insert data into pre-aggregate 
table from that same table.
    +        // For example: To compact preaggtable1 we can fire a query like 
insert into preaggtable1
    +        // select * from preaggtable1
    +        // The following code will generate the select query with a load 
UDF that will be used to
    +        // apply DataLoadingRules
    +        val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
    +          // adding the aggregation load UDF
    +          .addPreAggLoadFunction(
    +          // creating the select query on the bases on table schema
    +        PreAggregateUtil.createChildSelectQuery(
    +          dataMapSchema.getChildSchema, 
table.getDatabaseName))).drop("preAggLoad")
    +        val loadCommand = PreAggregateUtil.createLoadCommandForChild(
    +          dataMapSchema.getChildSchema.getListOfColumns,
    +          TableIdentifier(childTableName, Some(childDatabaseName)),
    +          childDataFrame,
    +          false,
    +          sparkSession)
    +        operationContext
    +          .setProperty(dataMapSchema.getChildSchema.getTableName + 
"_Compaction", loadCommand)
    +      }
    +    } else {
    +      val childTableName = table.getTableName
    +      val childDatabaseName = table.getDatabaseName
    +      // Creating a new query string to insert data into pre-aggregate 
table from that same table.
    +      // For example: To compact preaggtable1 we can fire a query like 
insert into preaggtable1
    +      // select * from preaggtable1
    +      // The following code will generate the select query with a load UDF 
that will be used to
    +      // apply DataLoadingRules
    +      val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
    +        // adding the aggregation load UDF
    +        .addPreAggLoadFunction(
    +        // creating the select query on the bases on table schema
    +        PreAggregateUtil.createChildSelectQuery(
    +          table.getTableInfo.getFactTable, 
table.getDatabaseName))).drop("preAggLoad")
    +      val loadCommand = PreAggregateUtil.createLoadCommandForChild(
    +        table.getTableInfo.getFactTable.getListOfColumns,
    +        TableIdentifier(childTableName, Some(childDatabaseName)),
    +        childDataFrame,
    +        false,
    +        sparkSession)
    +      operationContext.setProperty(table.getTableName + "_Compaction", 
loadCommand)
    +    }
    +  }
    +}
    +object LoadProcessMetaListener extends OperationEventListener {
    --- End diff --
    
    Add comment


---

Reply via email to