[CARBONDATA-2042][PreAggregate]Fixed data mismatch issue in case timeseries
Problem: Year, Month, Day level timeseries table giving wrong result Solution: Timeseries UDF is not able to convert data when hour is in 24 hours format This closes #1820 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/aac7af73 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/aac7af73 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/aac7af73 Branch: refs/heads/carbonstore Commit: aac7af7333aabd3b94e5e91c49f3f3d766103048 Parents: bc305c1 Author: kumarvishal <[email protected]> Authored: Wed Jan 17 14:34:56 2018 +0530 Committer: ravipesala <[email protected]> Committed: Fri Jan 19 12:34:19 2018 +0530 ---------------------------------------------------------------------- .../carbondata/core/preagg/TimeSeriesUDF.java | 4 ++-- .../src/test/resources/data_sort.csv | 21 ++++++++++++++++++++ .../timeseries/TestTimeseriesDataLoad.scala | 21 +++++++++++++++++++- .../CreatePreAggregateTableCommand.scala | 14 ++++++++++++- .../preaaggregate/PreAggregateListeners.scala | 11 +++++----- .../preaaggregate/PreAggregateUtil.scala | 19 ++++++++++++++++-- 6 files changed, 79 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/aac7af73/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java index 3aa4190..df712de 100644 --- a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java +++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java @@ -91,13 +91,13 @@ public class TimeSeriesUDF { calendar.set(Calendar.MILLISECOND, 0); calendar.set(Calendar.SECOND, 0); calendar.set(Calendar.MINUTE, 0); - calendar.set(Calendar.HOUR, 0); + calendar.set(Calendar.HOUR_OF_DAY, 0); calendar.set(Calendar.DAY_OF_MONTH, 1); break; case YEAR: calendar.set(Calendar.MONTH, 1); calendar.set(Calendar.DAY_OF_YEAR, 1); - calendar.set(Calendar.HOUR, 0); + calendar.set(Calendar.HOUR_OF_DAY, 0); calendar.set(Calendar.MINUTE, 0); calendar.set(Calendar.SECOND, 0); calendar.set(Calendar.MILLISECOND, 0); http://git-wip-us.apache.org/repos/asf/carbondata/blob/aac7af73/integration/spark-common-test/src/test/resources/data_sort.csv ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/resources/data_sort.csv b/integration/spark-common-test/src/test/resources/data_sort.csv new file mode 100644 index 0000000..dcf58a1 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/data_sort.csv @@ -0,0 +1,21 @@ +3,1,Mikaa1,2015-01-01 11:00:00,2015-01-01 13:00:00,198,260 +3,3,Mikaa2,2015-01-02 12:00:00,2015-01-01 14:00:00,278,230 +3,2,Mikaa1,2015-01-03 13:00:00,2015-01-01 15:00:00,2556,1 +3,5,Mikaa2,2015-01-04 14:00:00,2015-01-01 16:00:00,640,254 +3,4,Mikaa,2015-01-05 15:00:00,2015-01-01 17:00:00,980,256 +2,10,Mikaa,2015-01-06 16:00:00,2015-01-01 18:00:00,1,2378 +2,1,Mikaa,2015-01-07 17:00:00,2015-01-01 19:00:00,96,234 +2,9,max,2015-01-08 18:00:00,2015-01-01 20:00:00,89,236 +2,10,max,2015-01-09 19:00:00,2015-01-01 21:00:00,198.36,239.2 +2,6,Mikaa,2015-01-10 20:00:00,2015-01-01 22:00:00,134.9,23.8 +2,10,Mikaa,2015-01-11 21:00:00,2015-01-01 23:00:00,156.5,252.8 +3,5,Mikaa,2015-01-11 22:00:00,2015-01-02 00:00:00,10.2,100.56 +13,4,Mikaa,2015-01-11 23:00:00,2015-01-02 00:00:00,10.2,100.56 +14,8,Mikaa,2015-01-12 00:00:00,2015-01-02 00:00:00,10.2,100.56 +15,1,Mikaa,2015-01-12 01:00:00,2015-01-02 00:00:00,10.2,100.56 +16,2,Mikaa,2015-01-12 02:00:00,2015-01-02 00:00:00,10.2,100.56 +2,6,Mikaa,2015-01-12 03:00:00,2015-01-02 00:00:00,10.2,100.56 +18,7,Mikaa,2015-01-12 04:00:00,2015-01-02 00:00:00,10.2,100.56 +19,5,Mikaa,2015-01-12 05:00:00,2015-01-02 00:00:00,10.2,100.56 +20,9,Mikaa,2015-01-12 06:00:00,2015-01-02 00:00:00,10.2,100.56 +3,17,Mikaa,2015-01-12 06:00:00,2015-01-02 00:00:00,10.2,100.56 http://git-wip-us.apache.org/repos/asf/carbondata/blob/aac7af73/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala index 6a0ea62..4aad06c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala @@ -25,16 +25,34 @@ import org.scalatest.{BeforeAndAfterAll, Ignore} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -@Ignore class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll { override def beforeAll: Unit = { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) sql("drop table if exists mainTable") + sql("drop table if exists table_03") sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED BY 'org.apache.carbondata.format'") sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='mytime', 'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select mytime, sum(age) from mainTable group by mytime") sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable") + sql("CREATE TABLE table_03 (imei string,age int,mac string,productdate timestamp,updatedate timestamp,gamePointId double,contractid double ) STORED BY 'org.apache.carbondata.format'") + sql(s"LOAD DATA inpath '$resourcesPath/data_sort.csv' INTO table table_03 options ('DELIMITER'=',', 'QUOTECHAR'='','FILEHEADER'='imei,age,mac,productdate,updatedate,gamePointId,contractid')") + sql("create datamap ag1 on table table_03 using 'preaggregate' DMPROPERTIES ( 'timeseries.eventtime'='productdate','timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1')as select productdate,mac,sum(age) from table_03 group by productdate,mac") + + } + test("test Year level timeseries data validation1 ") { + checkAnswer( sql("select count(*) from table_03_ag1_year"), + Seq(Row(4))) + } + + test("test month level timeseries data validation1 ") { + checkAnswer( sql("select count(*) from table_03_ag1_month"), + Seq(Row(4))) + } + + test("test day level timeseries data validation1 ") { + checkAnswer( sql("select count(*) from table_03_ag1_day"), + Seq(Row(12))) } test("test Year level timeseries data validation") { @@ -89,5 +107,6 @@ class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll { override def afterAll: Unit = { sql("drop table if exists mainTable") + sql("drop table if exists table_03") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aac7af73/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index 3e86233..8b11548 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema import org.apache.carbondata.core.statusmanager.SegmentStatusManager /** @@ -151,12 +152,23 @@ case class CreatePreAggregateTableCommand( val loadAvailable = SegmentStatusManager.readLoadMetadata(parentCarbonTable.getMetaDataFilepath) .nonEmpty if (loadAvailable) { + val updatedQuery = if (timeSeriesFunction.isDefined) { + val dataMap = parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala + .filter(p => p.getDataMapName + .equalsIgnoreCase(dataMapName)).head + .asInstanceOf[AggregationDataMapSchema] + PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema, + parentCarbonTable.getTableName, + parentCarbonTable.getDatabaseName) + } else { + queryString + } // Passing segmentToLoad as * because we want to load all the segments into the // pre-aggregate table even if the user has set some segments on the parent table. PreAggregateUtil.startDataLoadForDataMap( parentCarbonTable, tableIdentifier, - queryString, + updatedQuery, segmentToLoad = "*", validateSegments = true, isOverwrite = false, http://git-wip-us.apache.org/repos/asf/carbondata/blob/aac7af73/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index 17e2f2b..fce32ab 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -62,17 +62,18 @@ object LoadPostAggregateListener extends OperationEventListener { } else { // for timeseries rollup policy val tableSelectedForRollup = PreAggregateUtil.getRollupDataMapNameForTimeSeries(list, - dataMapSchema) + dataMapSchema) + list += dataMapSchema // if non of the rollup data map is selected hit the maintable and prepare query if (tableSelectedForRollup.isEmpty) { PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema, - parentTableName, - databasename) + parentTableName, + databasename) } else { // otherwise hit the select rollup datamap schema PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema, - tableSelectedForRollup.get, - databasename) + tableSelectedForRollup.get, + databasename) } } val isOverwrite = http://git-wip-us.apache.org/repos/asf/carbondata/blob/aac7af73/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index d77f2c2..cd19e3b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -669,6 +669,7 @@ object PreAggregateUtil { val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String] val columns = tableSchema.getListOfColumns.asScala .filter(f => !f.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)) + .sortBy(_.getSchemaOrdinal) columns.foreach { a => if (a.getAggFunction.nonEmpty) { aggregateColumns += s"${a.getAggFunction match { @@ -681,13 +682,21 @@ object PreAggregateUtil { .getNonAggChildColBasedByParent(a.getParentColumnTableRelations. get(0).getColumnName).getColumnName } , '${ a.getTimeSeriesFunction }')" + aggregateColumns += s"timeseries(${ + selectedDataMapSchema + .getNonAggChildColBasedByParent(a.getParentColumnTableRelations. + get(0).getColumnName).getColumnName + } , '${ a.getTimeSeriesFunction }')" } else { groupingExpressions += selectedDataMapSchema .getNonAggChildColBasedByParent(a.getParentColumnTableRelations. get(0).getColumnName).getColumnName + aggregateColumns += selectedDataMapSchema + .getNonAggChildColBasedByParent(a.getParentColumnTableRelations. + get(0).getColumnName).getColumnName } } - s"select ${ groupingExpressions.mkString(",") },${ aggregateColumns.mkString(",") + s"select ${ aggregateColumns.mkString(",") } from $databaseName.${selectedDataMapSchema.getChildSchema.getTableName } " + s"group by ${ groupingExpressions.mkString(",") }" } @@ -707,6 +716,7 @@ object PreAggregateUtil { val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String] val columns = tableSchema.getListOfColumns.asScala .filter(f => !f.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)) + .sortBy(_.getSchemaOrdinal) columns.foreach {a => if (a.getAggFunction.nonEmpty) { aggregateColumns += @@ -715,11 +725,16 @@ object PreAggregateUtil { groupingExpressions += s"timeseries(${ a.getParentColumnTableRelations.get(0).getColumnName },'${ a.getTimeSeriesFunction}')" + aggregateColumns += + s"timeseries(${ a.getParentColumnTableRelations.get(0).getColumnName },'${ + a.getTimeSeriesFunction + }')" } else { groupingExpressions += a.getParentColumnTableRelations.get(0).getColumnName + aggregateColumns += a.getParentColumnTableRelations.get(0).getColumnName } } - s"select ${ groupingExpressions.mkString(",") },${ + s"select ${ aggregateColumns.mkString(",") } from $databaseName.${ parentTableName } group by ${ groupingExpressions.mkString(",") }"
