[CARBONDATA-1719][Pre-Aggregate][Bug] Fixed bug to handle data inconsistency on concurrent data load and pre-aggregate table creation
Problem: On concurrent data load and pre-aggregate table creation, datamap was not getting populated with the load data even after data load completes. This closes #1697 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/69de3874 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/69de3874 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/69de3874 Branch: refs/heads/branch-1.3 Commit: 69de3874a5ebb6031bea5e5c3f61ce2d6aba69d5 Parents: 0c65aba Author: SangeetaGulia <[email protected]> Authored: Wed Dec 20 18:02:51 2017 +0530 Committer: kumarvishal <[email protected]> Committed: Thu Jan 4 17:59:25 2018 +0530 ---------------------------------------------------------------------- .../command/preaaggregate/PreAggregateListeners.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/69de3874/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index fc42d5f..5e232f6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.execution.command.preaaggregate import scala.collection.JavaConverters._ import scala.collection.mutable +import org.apache.spark.sql.CarbonEnv import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand import org.apache.spark.sql.execution.command.AlterTableModel +import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema import org.apache.carbondata.core.util.CarbonUtil @@ -38,7 +39,8 @@ object LoadPostAggregateListener extends OperationEventListener { val loadEvent = event.asInstanceOf[LoadTablePreStatusUpdateEvent] val sparkSession = loadEvent.sparkSession val carbonLoadModel = loadEvent.carbonLoadModel - val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val table = CarbonEnv.getCarbonTable(Option(carbonLoadModel.getDatabaseName), + carbonLoadModel.getTableName)(sparkSession) if (CarbonUtil.hasAggregationDataMap(table)) { // getting all the aggergate datamap schema val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala
