Repository: carbondata
Updated Branches:
  refs/heads/master 804ddb76f -> e6497e14e


[CARBONDATA-1743] fix conurrent pre-agg creation and query

When a preaggregate table is created and load is in progress the aggregate 
query on main table tries to get the data from pre-aggregate table which used 
to give empty results.If the size of preaggregate table is 0 meaning no data is 
loaded then hit the main table instead of aggregate table

This closes #1521


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e6497e14
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e6497e14
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e6497e14

Branch: refs/heads/master
Commit: e6497e14e6454f0990354f542e0e7f2d888b9725
Parents: 804ddb7
Author: kunal642 <[email protected]>
Authored: Fri Nov 17 12:13:25 2017 +0530
Committer: kumarvishal <[email protected]>
Committed: Tue Dec 19 19:29:03 2017 +0530

----------------------------------------------------------------------
 .../TestPreAggregateTableSelection.scala        | 17 ++++-----
 .../TestTimeseriesTableSelection.scala          | 32 +++++++++--------
 .../sql/hive/CarbonPreAggregateRules.scala      | 38 ++++++++++++--------
 .../apache/spark/sql/hive/CarbonRelation.scala  | 19 ++++++----
 4 files changed, 63 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6497e14/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index 5b36826..7157b8a 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -27,7 +27,14 @@ class TestPreAggregateTableSelection extends QueryTest with 
BeforeAndAfterAll {
 
   override def beforeAll: Unit = {
     sql("drop table if exists mainTable")
-    sql("drop table if exists lineitem")
+    sql("drop table if exists agg0")
+    sql("drop table if exists agg1")
+    sql("drop table if exists agg2")
+    sql("drop table if exists agg3")
+    sql("drop table if exists agg4")
+    sql("drop table if exists agg5")
+    sql("drop table if exists agg6")
+    sql("drop table if exists agg7")
     sql("CREATE TABLE mainTable(id int, name string, city string, age string) 
STORED BY 'org.apache.carbondata.format'")
     sql("create datamap agg0 on table mainTable using 'preaggregate' as select 
name from mainTable group by name")
     sql("create datamap agg1 on table mainTable using 'preaggregate' as select 
name,sum(age) from mainTable group by name")
@@ -39,8 +46,6 @@ class TestPreAggregateTableSelection extends QueryTest with 
BeforeAndAfterAll {
     sql("create datamap agg7 on table mainTable using 'preaggregate' as select 
name,max(age) from mainTable group by name")
     sql("create datamap agg8 on table maintable using 'preaggregate' as select 
name, sum(id), avg(id) from maintable group by name")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' 
into table mainTable")
-    sql("create table if not exists lineitem(L_SHIPDATE string,L_SHIPMODE 
string,L_SHIPINSTRUCT string,L_RETURNFLAG string,L_RECEIPTDATE 
string,L_ORDERKEY string,L_PARTKEY string,L_SUPPKEY string,L_LINENUMBER 
int,L_QUANTITY double,L_EXTENDEDPRICE double,L_DISCOUNT double,L_TAX 
double,L_LINESTATUS string,L_COMMITDATE string,L_COMMENT string) STORED BY 
'org.apache.carbondata.format'TBLPROPERTIES 
('table_blocksize'='128','NO_INVERTED_INDEX'='L_SHIPDATE,L_SHIPMODE,L_SHIPINSTRUCT,L_RETURNFLAG,L_RECEIPTDATE,L_ORDERKEY,L_PARTKEY,L_SUPPKEY','sort_columns'='')")
-    sql("create datamap agr_lineitem ON TABLE lineitem USING 'preaggregate' as 
select L_RETURNFLAG,L_LINESTATUS,sum (L_QUANTITY),sum(L_EXTENDEDPRICE) from 
lineitem group by L_RETURNFLAG, L_LINESTATUS")
   }
 
   test("test sum and avg on same column should give proper results") {
@@ -138,11 +143,7 @@ class TestPreAggregateTableSelection extends QueryTest 
with BeforeAndAfterAll {
     val df = sql("select count(id) from mainTable")
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
   }
-
-  test("test PreAggregate table selection 19") {
-    val df = sql("select 
L_RETURNFLAG,L_LINESTATUS,sum(L_QUANTITY),sum(L_EXTENDEDPRICE) from lineitem 
group by L_RETURNFLAG, L_LINESTATUS")
-    preAggTableValidator(df.queryExecution.analyzed, "lineitem_agr_lineitem")
-  }
+  
   test("test PreAggregate table selection 20") {
     val df = sql("select name from mainTable group by name order by name")
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6497e14/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
index 0990f87..725ac24 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
@@ -28,71 +28,73 @@ class TestTimeseriesTableSelection extends QueryTest with 
BeforeAndAfterAll {
 
   override def beforeAll: Unit = {
     sql("drop table if exists mainTable")
-    sql("CREATE TABLE mainTable(dataTime timestamp, name string, city string, 
age int) STORED BY 'org.apache.carbondata.format'")
-    sql("create datamap agg0 on table mainTable using 'preaggregate' 
DMPROPERTIES ('timeseries.eventTime'='dataTime', 
'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as 
select dataTime, sum(age) from mainTable group by dataTime")
+    sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED 
BY 'org.apache.carbondata.format'")
+    sql("create datamap agg0 on table mainTable using 'preaggregate' 
DMPROPERTIES ('timeseries.eventTime'='mytime', 
'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as 
select mytime, sum(age) from mainTable group by mytime")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into 
table mainTable")
   }
+
   test("test PreAggregate table selection 1") {
-    val df = sql("select dataTime from mainTable group by dataTime")
+    val df = sql("select mytime from mainTable group by mytime")
     preAggTableValidator(df.queryExecution.analyzed, "maintable")
   }
 
   test("test PreAggregate table selection 2") {
-    val df = sql("select timeseries(dataTime,'hour') from mainTable group by 
timeseries(dataTime,'hour')")
+    val df = sql("select timeseries(mytime,'hour') from mainTable group by 
timeseries(mytime,'hour')")
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0_hour")
   }
 
   test("test PreAggregate table selection 3") {
-    val df = sql("select timeseries(dataTime,'milli') from mainTable group by 
timeseries(dataTime,'milli')")
+    val df = sql("select timeseries(mytime,'milli') from mainTable group by 
timeseries(mytime,'milli')")
     preAggTableValidator(df.queryExecution.analyzed, "maintable")
   }
 
   test("test PreAggregate table selection 4") {
-    val df = sql("select timeseries(dataTime,'year') from mainTable group by 
timeseries(dataTime,'year')")
+    val df = sql("select timeseries(mytime,'year') from mainTable group by 
timeseries(mytime,'year')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_year")
   }
 
   test("test PreAggregate table selection 5") {
-    val df = sql("select timeseries(dataTime,'day') from mainTable group by 
timeseries(dataTime,'day')")
+    val df = sql("select timeseries(mytime,'day') from mainTable group by 
timeseries(mytime,'day')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_day")
   }
 
   test("test PreAggregate table selection 6") {
-    val df = sql("select timeseries(dataTime,'month') from mainTable group by 
timeseries(dataTime,'month')")
+    val df = sql("select timeseries(mytime,'month') from mainTable group by 
timeseries(mytime,'month')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_month")
   }
 
   test("test PreAggregate table selection 7") {
-    val df = sql("select timeseries(dataTime,'minute') from mainTable group by 
timeseries(dataTime,'minute')")
+    val df = sql("select timeseries(mytime,'minute') from mainTable group by 
timeseries(mytime,'minute')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_minute")
   }
 
   test("test PreAggregate table selection 8") {
-    val df = sql("select timeseries(dataTime,'second') from mainTable group by 
timeseries(dataTime,'second')")
+    val df = sql("select timeseries(mytime,'second') from mainTable group by 
timeseries(mytime,'second')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_second")
   }
 
   test("test PreAggregate table selection 9") {
-    val df = sql("select timeseries(dataTime,'hour') from mainTable where 
timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour')")
+    val df = sql("select timeseries(mytime,'hour') from mainTable where 
timeseries(mytime,'hour')='x' group by timeseries(mytime,'hour')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
   }
 
   test("test PreAggregate table selection 10") {
-    val df = sql("select timeseries(dataTime,'hour') from mainTable where 
timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour') order by 
timeseries(dataTime,'hour')")
+    val df = sql("select timeseries(mytime,'hour') from mainTable where 
timeseries(mytime,'hour')='x' group by timeseries(mytime,'hour') order by 
timeseries(mytime,'hour')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
   }
 
   test("test PreAggregate table selection 11") {
-    val df = sql("select timeseries(dataTime,'hour'),sum(age) from mainTable 
where timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour') 
order by timeseries(dataTime,'hour')")
+    val df = sql("select timeseries(mytime,'hour'),sum(age) from mainTable 
where timeseries(mytime,'hour')='x' group by timeseries(mytime,'hour') order by 
timeseries(mytime,'hour')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
   }
 
   test("test PreAggregate table selection 12") {
-    val df = sql("select timeseries(dataTime,'hour')as hourlevel,sum(age) as 
sum from mainTable where timeseries(dataTime,'hour')='x' group by 
timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')")
+    val df = sql("select timeseries(mytime,'hour')as hourlevel,sum(age) as sum 
from mainTable where timeseries(mytime,'hour')='x' group by 
timeseries(mytime,'hour') order by timeseries(mytime,'hour')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour")
   }
 
   test("test PreAggregate table selection 13") {
-    val df = sql("select timeseries(dataTime,'hour')as hourlevel,sum(age) as 
sum from mainTable where timeseries(dataTime,'hour')='x' and name='vishal' 
group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')")
+    val df = sql("select timeseries(mytime,'hour')as hourlevel,sum(age) as sum 
from mainTable where timeseries(mytime,'hour')='x' and name='vishal' group by 
timeseries(mytime,'hour') order by timeseries(mytime,'hour')")
     preAggTableValidator(df.queryExecution.analyzed,"maintable")
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6497e14/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index c28cc44..76c39a4 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -296,21 +296,31 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
         val selectedDataMapSchemas = 
aggregateTableSelector.selectPreAggDataMapSchema()
         // if it does not match with any pre aggregate table return the same 
plan
         if (!selectedDataMapSchemas.isEmpty) {
-          // sort the selected child schema based on size to select smallest 
pre aggregate table
+          // filter the selected child schema based on size to select the 
pre-aggregate tables
+          // that are nonEmpty
           val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-          val (aggDataMapSchema, carbonRelation, relation) =
-            selectedDataMapSchemas.asScala.map { selectedDataMapSchema =>
-              val identifier = TableIdentifier(
-                selectedDataMapSchema.getRelationIdentifier.getTableName,
-                
Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
-              val carbonRelation =
-                
catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
-              val relation = 
sparkSession.sessionState.catalog.lookupRelation(identifier)
-              (selectedDataMapSchema, carbonRelation, relation)
-            }.minBy(f => f._2.sizeInBytes)
-          val newRelation = new 
FindDataSourceTable(sparkSession).apply(relation)
-          // transform the query plan based on selected child schema
-          transformPreAggQueryPlan(plan, aggDataMapSchema, newRelation)
+          val relationBuffer = selectedDataMapSchemas.asScala.map { 
selectedDataMapSchema =>
+            val identifier = TableIdentifier(
+              selectedDataMapSchema.getRelationIdentifier.getTableName,
+              
Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
+            val carbonRelation =
+              
catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
+            val relation = 
sparkSession.sessionState.catalog.lookupRelation(identifier)
+            (selectedDataMapSchema, carbonRelation, relation)
+          }.filter(_._2.sizeInBytes != 0L)
+          if (relationBuffer.isEmpty) {
+            // If the size of relation Buffer is 0 then it means that none of 
the pre-aggregate
+            // tables have date yet.
+            // In this case we would return the original plan so that the 
query hits the parent
+            // table.
+            plan
+          } else {
+            // If the relationBuffer is nonEmpty then find the table with the 
minimum size.
+            val (aggDataMapSchema, _, relation) = 
relationBuffer.minBy(_._2.sizeInBytes)
+            val newRelation = new 
FindDataSourceTable(sparkSession).apply(relation)
+            // transform the query plan based on selected child schema
+            transformPreAggQueryPlan(plan, aggDataMapSchema, newRelation)
+          }
         } else {
           plan
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6497e14/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index aadce98..87be2d2 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -22,6 +22,7 @@ import scala.Array.canBuildFrom
 import scala.collection.JavaConverters._
 import scala.util.parsing.combinator.RegexParsers
 
+import org.apache.spark.sql.CarbonEnv
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
@@ -205,12 +206,18 @@ case class CarbonRelation(
       carbonTable.getAbsoluteTableIdentifier)
 
     if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
-      val tablePath = CarbonStorePath.getCarbonTablePath(
-        carbonTable.getAbsoluteTableIdentifier).getPath
-      val fileType = FileFactory.getFileType(tablePath)
-      if(FileFactory.isFileExist(tablePath, fileType)) {
-        tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
-        sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath)
+      if (new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+        .getValidAndInvalidSegments.getValidSegments.isEmpty) {
+        sizeInBytesLocalValue = 0L
+      } else {
+        val tablePath = CarbonStorePath.getCarbonTablePath(
+          carbonTable.getTablePath,
+          carbonTable.getCarbonTableIdentifier).getPath
+        val fileType = FileFactory.getFileType(tablePath)
+        if (FileFactory.isFileExist(tablePath, fileType)) {
+          tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
+          sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath)
+        }
       }
     }
     sizeInBytesLocalValue

Reply via email to