Repository: carbondata
Updated Branches:
  refs/heads/master e40963254 -> 2f4dbb694


[CARBONDATA-2069] Restrict create datamap when load is in progress

Problem:
1. Load data into maintable
2. create datamap parallelly
preaggregate table will not have any data while data load is successful for 
main table. This will make the pre-aggregate table inconsistent

Solution: Restrict creation of pre-aggregate table when load is in progress on 
main table

This closes #1850


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2f4dbb69
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2f4dbb69
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2f4dbb69

Branch: refs/heads/master
Commit: 2f4dbb694c512aaaaa56ff1f0ed576dc50ac9f9d
Parents: e409632
Author: kunal642 <[email protected]>
Authored: Tue Jan 23 18:52:48 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Sun Jan 28 13:01:03 2018 +0530

----------------------------------------------------------------------
 .../CreatePreAggregateTableCommand.scala        | 25 ++++++++++++++++----
 1 file changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f4dbb69/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index c5340c2..a75a06f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -30,8 +30,10 @@ import 
org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import 
org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, 
CarbonTable}
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import 
org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
+import org.apache.carbondata.spark.util.DataLoadingUtil
 
 /**
  * Below command class will be used to create pre-aggregate table
@@ -180,9 +182,24 @@ case class CreatePreAggregateTableCommand(
     // This will be used to check if the parent table has any segments or not. 
If not then no
     // need to fire load for pre-aggregate table. Therefore reading the load 
details for PARENT
     // table.
+    DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, 
parentTable)
     val loadAvailable = 
SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath)
-      .nonEmpty
-    if (loadAvailable) {
+    if (loadAvailable.exists(load => load.getSegmentStatus == 
SegmentStatus.INSERT_IN_PROGRESS ||
+      load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
+      throw new UnsupportedOperationException(
+        "Cannot create pre-aggregate table when insert is in progress on main 
table")
+    } else if (loadAvailable.nonEmpty) {
+      val updatedQuery = if (timeSeriesFunction.isDefined) {
+        val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala
+          .filter(p => p.getDataMapName
+            .equalsIgnoreCase(dataMapName)).head
+          .asInstanceOf[AggregationDataMapSchema]
+        
PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema,
+          parentTable.getTableName,
+          parentTable.getDatabaseName)
+      } else {
+        queryString
+      }
       // Passing segmentToLoad as * because we want to load all the segments 
into the
       // pre-aggregate table even if the user has set some segments on the 
parent table.
       loadCommand.dataFrame = Some(PreAggregateUtil

Reply via email to