Repository: carbondata Updated Branches: refs/heads/master e40963254 -> 2f4dbb694
[CARBONDATA-2069] Restrict create datamap when load is in progress Problem: 1. Load data into maintable 2. create datamap parallelly preaggregate table will not have any data while data load is successful for main table. This will make the pre-aggregate table inconsistent Solution: Restrict creation of pre-aggregate table when load is in progress on main table This closes #1850 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2f4dbb69 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2f4dbb69 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2f4dbb69 Branch: refs/heads/master Commit: 2f4dbb694c512aaaaa56ff1f0ed576dc50ac9f9d Parents: e409632 Author: kunal642 <[email protected]> Authored: Tue Jan 23 18:52:48 2018 +0530 Committer: ravipesala <[email protected]> Committed: Sun Jan 28 13:01:03 2018 +0530 ---------------------------------------------------------------------- .../CreatePreAggregateTableCommand.scala | 25 ++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f4dbb69/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index c5340c2..a75a06f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -30,8 +30,10 @@ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable} -import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.spark.util.DataLoadingUtil /** * Below command class will be used to create pre-aggregate table @@ -180,9 +182,24 @@ case class CreatePreAggregateTableCommand( // This will be used to check if the parent table has any segments or not. If not then no // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT // table. + DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, parentTable) val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath) - .nonEmpty - if (loadAvailable) { + if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS || + load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) { + throw new UnsupportedOperationException( + "Cannot create pre-aggregate table when insert is in progress on main table") + } else if (loadAvailable.nonEmpty) { + val updatedQuery = if (timeSeriesFunction.isDefined) { + val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala + .filter(p => p.getDataMapName + .equalsIgnoreCase(dataMapName)).head + .asInstanceOf[AggregationDataMapSchema] + PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema, + parentTable.getTableName, + parentTable.getDatabaseName) + } else { + queryString + } // Passing segmentToLoad as * because we want to load all the segments into the // pre-aggregate table even if the user has set some segments on the parent table. loadCommand.dataFrame = Some(PreAggregateUtil
