[CARBONDATA-2042][PreAggregate]Fixed data mismatch issue in case timeseries

Problem: Year, Month, Day level timeseries table giving wrong result
Solution: Timeseries UDF is not able to convert data when hour is in 24 hours 
format

This closes #1820


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/aac7af73
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/aac7af73
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/aac7af73

Branch: refs/heads/carbonstore
Commit: aac7af7333aabd3b94e5e91c49f3f3d766103048
Parents: bc305c1
Author: kumarvishal <[email protected]>
Authored: Wed Jan 17 14:34:56 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Fri Jan 19 12:34:19 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/preagg/TimeSeriesUDF.java   |  4 ++--
 .../src/test/resources/data_sort.csv            | 21 ++++++++++++++++++++
 .../timeseries/TestTimeseriesDataLoad.scala     | 21 +++++++++++++++++++-
 .../CreatePreAggregateTableCommand.scala        | 14 ++++++++++++-
 .../preaaggregate/PreAggregateListeners.scala   | 11 +++++-----
 .../preaaggregate/PreAggregateUtil.scala        | 19 ++++++++++++++++--
 6 files changed, 79 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/aac7af73/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java 
b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
index 3aa4190..df712de 100644
--- a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
@@ -91,13 +91,13 @@ public class TimeSeriesUDF {
         calendar.set(Calendar.MILLISECOND, 0);
         calendar.set(Calendar.SECOND, 0);
         calendar.set(Calendar.MINUTE, 0);
-        calendar.set(Calendar.HOUR, 0);
+        calendar.set(Calendar.HOUR_OF_DAY, 0);
         calendar.set(Calendar.DAY_OF_MONTH, 1);
         break;
       case YEAR:
         calendar.set(Calendar.MONTH, 1);
         calendar.set(Calendar.DAY_OF_YEAR, 1);
-        calendar.set(Calendar.HOUR, 0);
+        calendar.set(Calendar.HOUR_OF_DAY, 0);
         calendar.set(Calendar.MINUTE, 0);
         calendar.set(Calendar.SECOND, 0);
         calendar.set(Calendar.MILLISECOND, 0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aac7af73/integration/spark-common-test/src/test/resources/data_sort.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/data_sort.csv 
b/integration/spark-common-test/src/test/resources/data_sort.csv
new file mode 100644
index 0000000..dcf58a1
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/data_sort.csv
@@ -0,0 +1,21 @@
+3,1,Mikaa1,2015-01-01 11:00:00,2015-01-01 13:00:00,198,260
+3,3,Mikaa2,2015-01-02 12:00:00,2015-01-01 14:00:00,278,230
+3,2,Mikaa1,2015-01-03 13:00:00,2015-01-01 15:00:00,2556,1
+3,5,Mikaa2,2015-01-04 14:00:00,2015-01-01 16:00:00,640,254
+3,4,Mikaa,2015-01-05 15:00:00,2015-01-01 17:00:00,980,256
+2,10,Mikaa,2015-01-06 16:00:00,2015-01-01 18:00:00,1,2378
+2,1,Mikaa,2015-01-07 17:00:00,2015-01-01 19:00:00,96,234
+2,9,max,2015-01-08 18:00:00,2015-01-01 20:00:00,89,236
+2,10,max,2015-01-09 19:00:00,2015-01-01 21:00:00,198.36,239.2
+2,6,Mikaa,2015-01-10 20:00:00,2015-01-01 22:00:00,134.9,23.8
+2,10,Mikaa,2015-01-11 21:00:00,2015-01-01 23:00:00,156.5,252.8
+3,5,Mikaa,2015-01-11 22:00:00,2015-01-02 00:00:00,10.2,100.56
+13,4,Mikaa,2015-01-11 23:00:00,2015-01-02 00:00:00,10.2,100.56
+14,8,Mikaa,2015-01-12 00:00:00,2015-01-02 00:00:00,10.2,100.56
+15,1,Mikaa,2015-01-12 01:00:00,2015-01-02 00:00:00,10.2,100.56
+16,2,Mikaa,2015-01-12 02:00:00,2015-01-02 00:00:00,10.2,100.56
+2,6,Mikaa,2015-01-12 03:00:00,2015-01-02 00:00:00,10.2,100.56
+18,7,Mikaa,2015-01-12 04:00:00,2015-01-02 00:00:00,10.2,100.56
+19,5,Mikaa,2015-01-12 05:00:00,2015-01-02 00:00:00,10.2,100.56
+20,9,Mikaa,2015-01-12 06:00:00,2015-01-02 00:00:00,10.2,100.56
+3,17,Mikaa,2015-01-12 06:00:00,2015-01-02 00:00:00,10.2,100.56

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aac7af73/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
index 6a0ea62..4aad06c 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
@@ -25,16 +25,34 @@ import org.scalatest.{BeforeAndAfterAll, Ignore}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 
-@Ignore
 class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll: Unit = {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
     sql("drop table if exists mainTable")
+    sql("drop table if exists table_03")
     sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED 
BY 'org.apache.carbondata.format'")
     sql("create datamap agg0 on table mainTable using 'preaggregate' 
DMPROPERTIES ('timeseries.eventTime'='mytime', 
'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as 
select mytime, sum(age) from mainTable group by mytime")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into 
table mainTable")
+    sql("CREATE TABLE table_03 (imei string,age int,mac string,productdate 
timestamp,updatedate timestamp,gamePointId double,contractid double ) STORED BY 
'org.apache.carbondata.format'")
+    sql(s"LOAD DATA inpath '$resourcesPath/data_sort.csv' INTO table table_03 
options ('DELIMITER'=',', 
'QUOTECHAR'='','FILEHEADER'='imei,age,mac,productdate,updatedate,gamePointId,contractid')")
+    sql("create datamap ag1 on table table_03 using 'preaggregate' 
DMPROPERTIES ( 
'timeseries.eventtime'='productdate','timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1')as
 select productdate,mac,sum(age) from table_03 group by productdate,mac")
+
+  }
+  test("test Year level timeseries data validation1 ") {
+    checkAnswer( sql("select count(*) from table_03_ag1_year"),
+      Seq(Row(4)))
+  }
+
+  test("test month level timeseries data validation1 ") {
+    checkAnswer( sql("select count(*) from table_03_ag1_month"),
+      Seq(Row(4)))
+  }
+
+  test("test day level timeseries data validation1 ") {
+    checkAnswer( sql("select count(*) from table_03_ag1_day"),
+      Seq(Row(12)))
   }
 
   test("test Year level timeseries data validation") {
@@ -89,5 +107,6 @@ class TestTimeseriesDataLoad extends QueryTest with 
BeforeAndAfterAll {
 
   override def afterAll: Unit = {
     sql("drop table if exists mainTable")
+    sql("drop table if exists table_03")
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aac7af73/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 3e86233..8b11548 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -29,6 +29,7 @@ import 
org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import 
org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 
 /**
@@ -151,12 +152,23 @@ case class CreatePreAggregateTableCommand(
     val loadAvailable = 
SegmentStatusManager.readLoadMetadata(parentCarbonTable.getMetaDataFilepath)
       .nonEmpty
     if (loadAvailable) {
+      val updatedQuery = if (timeSeriesFunction.isDefined) {
+        val dataMap = 
parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala
+          .filter(p => p.getDataMapName
+            .equalsIgnoreCase(dataMapName)).head
+          .asInstanceOf[AggregationDataMapSchema]
+        
PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema,
+          parentCarbonTable.getTableName,
+          parentCarbonTable.getDatabaseName)
+      } else {
+        queryString
+      }
       // Passing segmentToLoad as * because we want to load all the segments 
into the
       // pre-aggregate table even if the user has set some segments on the 
parent table.
       PreAggregateUtil.startDataLoadForDataMap(
           parentCarbonTable,
           tableIdentifier,
-          queryString,
+          updatedQuery,
           segmentToLoad = "*",
           validateSegments = true,
           isOverwrite = false,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aac7af73/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 17e2f2b..fce32ab 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -62,17 +62,18 @@ object LoadPostAggregateListener extends 
OperationEventListener {
         } else {
           // for timeseries rollup policy
           val tableSelectedForRollup = 
PreAggregateUtil.getRollupDataMapNameForTimeSeries(list,
-            dataMapSchema)
+              dataMapSchema)
+          list += dataMapSchema
           // if non of the rollup data map is selected hit the maintable and 
prepare query
           if (tableSelectedForRollup.isEmpty) {
             
PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema,
-              parentTableName,
-              databasename)
+                parentTableName,
+                databasename)
           } else {
             // otherwise hit the select rollup datamap schema
             
PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema,
-              tableSelectedForRollup.get,
-              databasename)
+                tableSelectedForRollup.get,
+                databasename)
           }
         }
         val isOverwrite =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aac7af73/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index d77f2c2..cd19e3b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -669,6 +669,7 @@ object PreAggregateUtil {
     val groupingExpressions = 
scala.collection.mutable.ArrayBuffer.empty[String]
     val columns = tableSchema.getListOfColumns.asScala
       .filter(f => 
!f.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+      .sortBy(_.getSchemaOrdinal)
     columns.foreach { a =>
       if (a.getAggFunction.nonEmpty) {
         aggregateColumns += s"${a.getAggFunction match {
@@ -681,13 +682,21 @@ object PreAggregateUtil {
             .getNonAggChildColBasedByParent(a.getParentColumnTableRelations.
               get(0).getColumnName).getColumnName
         } , '${ a.getTimeSeriesFunction }')"
+        aggregateColumns += s"timeseries(${
+          selectedDataMapSchema
+            .getNonAggChildColBasedByParent(a.getParentColumnTableRelations.
+              get(0).getColumnName).getColumnName
+        } , '${ a.getTimeSeriesFunction }')"
       } else {
         groupingExpressions += selectedDataMapSchema
           .getNonAggChildColBasedByParent(a.getParentColumnTableRelations.
             get(0).getColumnName).getColumnName
+        aggregateColumns += selectedDataMapSchema
+          .getNonAggChildColBasedByParent(a.getParentColumnTableRelations.
+            get(0).getColumnName).getColumnName
       }
     }
-    s"select ${ groupingExpressions.mkString(",") },${ 
aggregateColumns.mkString(",")
+    s"select ${ aggregateColumns.mkString(",")
     } from $databaseName.${selectedDataMapSchema.getChildSchema.getTableName } 
" +
     s"group by ${ groupingExpressions.mkString(",") }"
   }
@@ -707,6 +716,7 @@ object PreAggregateUtil {
     val groupingExpressions = 
scala.collection.mutable.ArrayBuffer.empty[String]
     val columns = tableSchema.getListOfColumns.asScala
       .filter(f => 
!f.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+      .sortBy(_.getSchemaOrdinal)
     columns.foreach {a =>
         if (a.getAggFunction.nonEmpty) {
           aggregateColumns +=
@@ -715,11 +725,16 @@ object PreAggregateUtil {
           groupingExpressions +=
           s"timeseries(${ a.getParentColumnTableRelations.get(0).getColumnName 
},'${
             a.getTimeSeriesFunction}')"
+          aggregateColumns +=
+          s"timeseries(${ a.getParentColumnTableRelations.get(0).getColumnName 
},'${
+            a.getTimeSeriesFunction
+          }')"
         } else {
           groupingExpressions += 
a.getParentColumnTableRelations.get(0).getColumnName
+          aggregateColumns += 
a.getParentColumnTableRelations.get(0).getColumnName
         }
     }
-    s"select ${ groupingExpressions.mkString(",") },${
+    s"select ${
       aggregateColumns.mkString(",")
     } from $databaseName.${ parentTableName } group by ${ 
groupingExpressions.mkString(",") }"
 

Reply via email to