Repository: carbondata Updated Branches: refs/heads/master d3b228fb8 -> a9a0201b4
[CARBONDATA-2082] Timeseries pre-aggregate table should support the blank space Timeseries pre-aggregate table should support the blank space, including:event_time,different franularity This closes #1902 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a9a0201b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a9a0201b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a9a0201b Branch: refs/heads/master Commit: a9a0201b468505c79d1881607fb0673ee588d85a Parents: d3b228f Author: xubo245 <601450...@qq.com> Authored: Thu Feb 1 15:32:36 2018 +0800 Committer: kumarvishal <kumarvishal.1...@gmail.com> Committed: Fri Feb 2 18:38:44 2018 +0530 ---------------------------------------------------------------------- .../timeseries/TestTimeSeriesCreateTable.scala | 76 ++++++++++++++++++++ .../datamap/CarbonCreateDataMapCommand.scala | 17 +++-- .../command/timeseries/TimeSeriesUtil.scala | 11 ++- 3 files changed, 92 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9a0201b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala index b63fd53..f3bbcaf 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala @@ -368,6 +368,82 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { assert(e.getMessage.contains("identifier matching regex")) } + test("test timeseries create table 33: support event_time and granularity key with space") { + sql("DROP DATAMAP IF EXISTS agg1_month ON TABLE maintable") + sql( + s"""CREATE DATAMAP agg1_month ON TABLE mainTable + |USING '$timeSeries' + |DMPROPERTIES ( + | ' event_time '='dataTime', + | ' MONTH_GRANULARITY '='1') + |AS SELECT dataTime, SUM(age) FROM mainTable + |GROUP BY dataTime + """.stripMargin) + checkExistence(sql("SHOW DATAMAP ON TABLE maintable"), true, "maintable_agg1_month") + sql("DROP DATAMAP IF EXISTS agg1_month ON TABLE maintable") + } + + + test("test timeseries create table 34: support event_time value with space") { + sql("DROP DATAMAP IF EXISTS agg1_month ON TABLE maintable") + sql( + s"""CREATE DATAMAP agg1_month ON TABLE mainTable + |USING '$timeSeries' + |DMPROPERTIES ( + | 'event_time '=' dataTime', + | 'MONTH_GRANULARITY '='1') + |AS SELECT dataTime, SUM(age) FROM mainTable + |GROUP BY dataTime + """.stripMargin) + checkExistence(sql("SHOW DATAMAP ON TABLE maintable"), true, "maintable_agg1_month") + sql("DROP DATAMAP IF EXISTS agg1_month ON TABLE maintable") + } + + test("test timeseries create table 35: support granularity value with space") { + sql("DROP DATAMAP IF EXISTS agg1_month ON TABLE maintable") + sql( + s"""CREATE DATAMAP agg1_month ON TABLE mainTable + |USING '$timeSeries' + |DMPROPERTIES ( + | 'event_time '='dataTime', + | 'MONTH_GRANULARITY '=' 1') + |AS SELECT dataTime, SUM(age) FROM mainTable + |GROUP BY dataTime + """.stripMargin) + checkExistence(sql("SHOW DATAMAP ON TABLE maintable"), true, "maintable_agg1_month") + sql("DROP DATAMAP IF EXISTS agg1_month ON TABLE maintable") + } + + test("test timeseries create table 36: support event_time and granularity value with space") { + sql("DROP DATAMAP IF EXISTS agg1_month ON TABLE maintable") + sql( + s""" + | CREATE DATAMAP agg1_month ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime ', + | 'MONTH_GRANULARITY'=' 1 ') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + checkExistence(sql("SHOW DATAMAP ON TABLE maintable"), true, "maintable_agg1_month") + } + + test("test timeseries create table 37: unsupport event_time error value") { + sql("DROP DATAMAP IF EXISTS agg1_month ON TABLE maintable") + intercept[NullPointerException] { + sql( + s"""CREATE DATAMAP agg1_month ON TABLE mainTable USING '$timeSeries' + |DMPROPERTIES ( + | 'event_time'='data Time', + | 'MONTH_GRANULARITY'='1') + |AS SELECT dataTime, SUM(age) FROM mainTable + |GROUP BY dataTime + """.stripMargin) + } + sql("DROP DATAMAP IF EXISTS agg1_month ON TABLE maintable") + } + override def afterAll: Unit = { sql("DROP TABLE IF EXISTS mainTable") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9a0201b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index da20ac5..242087e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -35,7 +35,7 @@ case class CarbonCreateDataMapCommand( dataMapName: String, tableIdentifier: TableIdentifier, dmClassName: String, - dmproperties: Map[String, String], + dmProperties: Map[String, String], queryString: Option[String], ifNotExistsSet: Boolean = false) extends AtomicRunnableCommand { @@ -54,6 +54,12 @@ case class CarbonCreateDataMapCommand( val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val dbName = tableIdentifier.database.getOrElse("default") val tableName = tableIdentifier.table + "_" + dataMapName + val newDmProperties = if (dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).isDefined) { + dmProperties.updated(TimeSeriesUtil.TIMESERIES_EVENTTIME, + dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get.trim) + } else { + dmProperties + } if (sparkSession.sessionState.catalog.listTables(dbName) .exists(_.table.equalsIgnoreCase(tableName))) { @@ -66,12 +72,11 @@ case class CarbonCreateDataMapCommand( } } else if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { - TimeSeriesUtil.validateTimeSeriesGranularity(dmproperties, dmClassName) - + TimeSeriesUtil.validateTimeSeriesGranularity(newDmProperties, dmClassName) createPreAggregateTableCommands = if (dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { val details = TimeSeriesUtil - .getTimeSeriesGranularityDetails(dmproperties, dmClassName) - val updatedDmProperties = dmproperties - details._1 + .getTimeSeriesGranularityDetails(newDmProperties, dmClassName) + val updatedDmProperties = newDmProperties - details._1 CreatePreAggregateTableCommand(dataMapName, tableIdentifier, dmClassName, @@ -84,7 +89,7 @@ case class CarbonCreateDataMapCommand( dataMapName, tableIdentifier, dmClassName, - dmproperties, + newDmProperties, queryString.get, ifNotExistsSet = ifNotExistsSet) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9a0201b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala index 987d4fe..45767da 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala @@ -46,7 +46,7 @@ object TimeSeriesUtil { if (!eventTime.isDefined) { throw new MalformedCarbonCommandException("event_time not defined in time series") } else { - val carbonColumn = parentTable.getColumnByName(parentTable.getTableName, eventTime.get) + val carbonColumn = parentTable.getColumnByName(parentTable.getTableName, eventTime.get.trim) if (carbonColumn.getDataType != DataTypes.TIMESTAMP) { throw new MalformedCarbonCommandException( "Timeseries event time is only supported on Timestamp " + @@ -110,7 +110,7 @@ object TimeSeriesUtil { val defaultValue = "1" for (granularity <- Granularity.values()) { if (dmProperties.get(granularity.getName).isDefined && - dmProperties.get(granularity.getName).get.equalsIgnoreCase(defaultValue)) { + dmProperties.get(granularity.getName).get.trim.equalsIgnoreCase(defaultValue)) { return (granularity.toString.toLowerCase, dmProperties.get(granularity.getName).get) } } @@ -168,10 +168,9 @@ object TimeSeriesUtil { /** * Below method will be used to validate whether timeseries column present in * select statement or not - * @param fieldMapping - * fields from select plan - * @param timeSeriesColumn - * timeseries column name + * + * @param fieldMapping fields from select plan + * @param timeSeriesColumn timeseries column name */ def validateEventTimeColumnExitsInSelect(fieldMapping: scala.collection.mutable .LinkedHashMap[Field, DataMapField],