Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1837#discussion_r163758021
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
---
@@ -20,18 +20,141 @@ package
org.apache.spark.sql.execution.command.preaaggregate
import scala.collection.JavaConverters._
import scala.collection.mutable
-import org.apache.spark.sql.CarbonEnv
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.AlterTableModel
-import
org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
+import
org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand,
CarbonLoadDataCommand}
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
-import org.apache.carbondata.core.constants.CarbonCommonConstants
import
org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.events._
-import
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent,
LoadTablePreStatusUpdateEvent}
+import
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent,
LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+object CompactionProcessMetaListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext): Unit = {
+ val sparkSession = SparkSession.getActiveSession.get
+ val tableEvent = event.asInstanceOf[LoadMetadataEvent]
+ val table = tableEvent.getCarbonTable
+ if (!table.isChildDataMap && CarbonUtil.hasAggregationDataMap(table)) {
+ val aggregationDataMapList =
table.getTableInfo.getDataMapSchemaList.asScala
+ .filter(_.isInstanceOf[AggregationDataMapSchema])
+ .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]]
+ for (dataMapSchema: AggregationDataMapSchema <-
aggregationDataMapList) {
+ val childTableName =
dataMapSchema.getRelationIdentifier.getTableName
+ val childDatabaseName =
dataMapSchema.getRelationIdentifier.getDatabaseName
+ // Creating a new query string to insert data into pre-aggregate
table from that same table.
+ // For example: To compact preaggtable1 we can fire a query like
insert into preaggtable1
+ // select * from preaggtable1
+ // The following code will generate the select query with a load
UDF that will be used to
+ // apply DataLoadingRules
+ val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
+ // adding the aggregation load UDF
+ .addPreAggLoadFunction(
+ // creating the select query on the bases on table schema
+ PreAggregateUtil.createChildSelectQuery(
+ dataMapSchema.getChildSchema,
table.getDatabaseName))).drop("preAggLoad")
+ val loadCommand = PreAggregateUtil.createLoadCommandForChild(
+ dataMapSchema.getChildSchema.getListOfColumns,
+ TableIdentifier(childTableName, Some(childDatabaseName)),
+ childDataFrame,
+ false,
+ sparkSession)
+ operationContext
+ .setProperty(dataMapSchema.getChildSchema.getTableName +
"_Compaction", loadCommand)
+ }
+ } else {
+ val childTableName = table.getTableName
+ val childDatabaseName = table.getDatabaseName
+ // Creating a new query string to insert data into pre-aggregate
table from that same table.
+ // For example: To compact preaggtable1 we can fire a query like
insert into preaggtable1
+ // select * from preaggtable1
+ // The following code will generate the select query with a load UDF
that will be used to
+ // apply DataLoadingRules
+ val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
+ // adding the aggregation load UDF
+ .addPreAggLoadFunction(
+ // creating the select query on the bases on table schema
+ PreAggregateUtil.createChildSelectQuery(
+ table.getTableInfo.getFactTable,
table.getDatabaseName))).drop("preAggLoad")
+ val loadCommand = PreAggregateUtil.createLoadCommandForChild(
+ table.getTableInfo.getFactTable.getListOfColumns,
+ TableIdentifier(childTableName, Some(childDatabaseName)),
+ childDataFrame,
+ false,
+ sparkSession)
+ operationContext.setProperty(table.getTableName + "_Compaction",
loadCommand)
+ }
+ }
+}
+object LoadProcessMetaListener extends OperationEventListener {
--- End diff --
Add comment
---