Repository: carbondata Updated Branches: refs/heads/master 0e8707a60 -> 49763b72b
[CARBONDATA-1518][Pre-Aggregate]Support creating timeseries while creating main table. This closes #1565 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/49763b72 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/49763b72 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/49763b72 Branch: refs/heads/master Commit: 49763b72bce8f38404e693b39bb440acb04e601f Parents: 0e8707a Author: kumarvishal <[email protected]> Authored: Tue Dec 5 16:00:48 2017 +0530 Committer: ravipesala <[email protected]> Committed: Thu Dec 7 08:29:01 2017 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 4 + .../ThriftWrapperSchemaConverterImpl.java | 15 +- .../schema/table/column/ColumnSchema.java | 20 +++ .../core/preagg/TimeSeriesFunction.java | 40 +++++ .../carbondata/core/preagg/TimeSeriesUDF.java | 127 +++++++++++++++ .../util/AbstractDataFileFooterConverter.java | 11 +- .../timeseries/TestTimeSeriesCreateTable.scala | 93 +++++++++++ .../command/carbonTableSchemaCommon.scala | 2 +- .../scala/org/apache/spark/sql/CarbonEnv.scala | 6 + .../datamap/CarbonCreateDataMapCommand.scala | 33 +++- .../CreatePreAggregateTableCommand.scala | 13 +- .../preaaggregate/PreAggregateUtil.scala | 2 +- .../command/timeseries/TimeseriesUtil.scala | 159 +++++++++++++++++++ 13 files changed, 513 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 43985b2..72d8b0c 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1440,6 +1440,10 @@ public final class CarbonCommonConstants { */ public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024; + public static final String TIMESERIES_EVENTTIME = "timeseries.eventtime"; + + public static final String TIMESERIES_HIERARCHY = "timeseries.hierarchy"; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java index 8a24e38..c1e68da 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java @@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.metadata.schema.table.TableSchema; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation; +import org.apache.carbondata.core.preagg.TimeSeriesUDF; /** * Thrift schema to carbon schema converter and vice versa @@ -198,6 +199,10 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter { thriftColumnSchema.setColumnProperties(properties); } thriftColumnSchema.setAggregate_function(wrapperColumnSchema.getAggFunction()); + if (null != wrapperColumnSchema.getTimeSeriesFunction() && !wrapperColumnSchema + .getTimeSeriesFunction().isEmpty()) { + thriftColumnSchema.setAggregate_function(wrapperColumnSchema.getTimeSeriesFunction()); + } List<ParentColumnTableRelation> parentColumnTableRelations = wrapperColumnSchema.getParentColumnTableRelations(); if (null != parentColumnTableRelations) { @@ -518,7 +523,15 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter { wrapperColumnSchema.setSortColumn(true); } } - wrapperColumnSchema.setAggFunction(externalColumnSchema.getAggregate_function()); + if (null != externalColumnSchema.getAggregate_function().toLowerCase()) { + if (TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION + .contains(externalColumnSchema.getAggregate_function().toLowerCase())) { + wrapperColumnSchema + .setTimeSeriesFunction(externalColumnSchema.getAggregate_function().toLowerCase()); + } else { + wrapperColumnSchema.setAggFunction(externalColumnSchema.getAggregate_function()); + } + } List<org.apache.carbondata.format.ParentColumnTableRelation> parentColumnTableRelation = externalColumnSchema.getParentColumnTableRelations(); if (null != parentColumnTableRelation) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java index ea7005f..edae4d7 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java @@ -126,9 +126,17 @@ public class ColumnSchema implements Serializable, Writable { */ private String aggFunction = ""; + /** + * list of parent column relations + */ private List<ParentColumnTableRelation> parentColumnTableRelations; /** + * timeseries function applied on column + */ + private String timeSeriesFunction = ""; + + /** * @return the columnName */ public String getColumnName() { @@ -439,6 +447,16 @@ public class ColumnSchema implements Serializable, Writable { this.aggFunction = aggFunction; } + public String getTimeSeriesFunction() { + return timeSeriesFunction; + } + + public void setTimeSeriesFunction(String timeSeriesFunction) { + if (null != timeSeriesFunction) { + this.timeSeriesFunction = timeSeriesFunction; + } + } + @Override public void write(DataOutput out) throws IOException { out.writeShort(dataType.getId()); @@ -476,6 +494,7 @@ public class ColumnSchema implements Serializable, Writable { out.writeBoolean(invisible); out.writeBoolean(isSortColumn); out.writeUTF(null != aggFunction ? aggFunction : ""); + out.writeUTF(timeSeriesFunction); boolean isParentTableColumnRelationExists = null != parentColumnTableRelations && parentColumnTableRelations.size() > 0; out.writeBoolean(isParentTableColumnRelationExists); @@ -521,6 +540,7 @@ public class ColumnSchema implements Serializable, Writable { this.invisible = in.readBoolean(); this.isSortColumn = in.readBoolean(); this.aggFunction = in.readUTF(); + this.timeSeriesFunction = in.readUTF(); boolean isParentTableColumnRelationExists = in.readBoolean(); if (isParentTableColumnRelationExists) { short parentColumnTableRelationSize = in.readShort(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java new file mode 100644 index 0000000..02ff753 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.preagg; + +/** + * enum for timeseries function + */ +public enum TimeSeriesFunction { + SECOND("second"), + MINUTE("minute"), + HOUR("hour"), + DAY("day"), + MONTH("month"), + YEAR("year"); + + private String name; + + TimeSeriesFunction(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java new file mode 100644 index 0000000..50cb052 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.preagg; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.List; + +/** + * class for applying timeseries udf + */ +public class TimeSeriesUDF { + + public final List<String> TIMESERIES_FUNCTION = new ArrayList<>(); + + // thread local for keeping calender instance + private ThreadLocal<Calendar> calanderThreadLocal = new ThreadLocal<>(); + + /** + * singleton instance + */ + public static final TimeSeriesUDF INSTANCE = new TimeSeriesUDF(); + + private TimeSeriesUDF() { + initialize(); + } + + /** + * Below method will be used to apply udf on data provided + * Method will work based on below logic. + * Data: 2016-7-23 01:01:30,10 + * Year Level UDF will return: 2016-1-1 00:00:00,0 + * Month Level UDF will return: 2016-7-1 00:00:00,0 + * Day Level UDF will return: 2016-7-23 00:00:00,0 + * Hour Level UDF will return: 2016-7-23 01:00:00,0 + * Minute Level UDF will return: 2016-7-23 01:01:00,0 + * Second Level UDF will return: 2016-7-23 01:01:30,0 + * If function does not match with any of the above functions + * it will throw IllegalArgumentException + * + * @param data timestamp data + * @param function time series function name + * @return data after applying udf + */ + public Timestamp applyUDF(Timestamp data, String function) { + if (null == data) { + return data; + } + initialize(); + Calendar calendar = calanderThreadLocal.get(); + calendar.clear(); + calendar.setTimeInMillis(data.getTime()); + TimeSeriesFunction timeSeriesFunction = TimeSeriesFunction.valueOf(function); + switch (timeSeriesFunction) { + case SECOND: + calendar.set(Calendar.MILLISECOND, 0); + break; + case MINUTE: + calendar.set(Calendar.MILLISECOND, 0); + calendar.set(Calendar.SECOND, 0); + break; + case HOUR: + calendar.set(Calendar.MILLISECOND, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MINUTE, 0); + break; + case DAY: + calendar.set(Calendar.HOUR_OF_DAY, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + break; + case MONTH: + calendar.set(Calendar.MILLISECOND, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.HOUR, 0); + calendar.set(Calendar.DAY_OF_MONTH, 1); + break; + case YEAR: + calendar.set(Calendar.MONTH, 1); + calendar.set(Calendar.DAY_OF_YEAR, 1); + calendar.set(Calendar.HOUR, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + break; + default: + throw new IllegalArgumentException("Invalid timeseries function name: " + function); + } + data.setTime(calendar.getTimeInMillis()); + return data; + } + + /** + * Below method will be used to initialize the thread local + */ + private synchronized void initialize() { + if (calanderThreadLocal.get() == null) { + calanderThreadLocal.set(new GregorianCalendar()); + } + if (TIMESERIES_FUNCTION.isEmpty()) { + TIMESERIES_FUNCTION.add("second"); + TIMESERIES_FUNCTION.add("minute"); + TIMESERIES_FUNCTION.add("hour"); + TIMESERIES_FUNCTION.add("day"); + TIMESERIES_FUNCTION.add("month"); + TIMESERIES_FUNCTION.add("year"); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java index b9ec3f1..f65e98d 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java +++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java @@ -42,6 +42,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation; +import org.apache.carbondata.core.preagg.TimeSeriesUDF; import org.apache.carbondata.core.reader.CarbonIndexFileReader; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.format.BlockIndex; @@ -289,7 +290,15 @@ public abstract class AbstractDataFileFooterConverter { wrapperColumnSchema.setSortColumn(true); } } - wrapperColumnSchema.setAggFunction(externalColumnSchema.getAggregate_function()); + if (null != externalColumnSchema.getAggregate_function()) { + if (TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION + .contains(externalColumnSchema.getAggregate_function().toLowerCase())) { + wrapperColumnSchema + .setTimeSeriesFunction(externalColumnSchema.getAggregate_function().toLowerCase()); + } else { + wrapperColumnSchema.setAggFunction(externalColumnSchema.getAggregate_function()); + } + } List<org.apache.carbondata.format.ParentColumnTableRelation> parentColumnTableRelation = externalColumnSchema.getParentColumnTableRelations(); if (null != parentColumnTableRelation) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala new file mode 100644 index 0000000..b60e487 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala @@ -0,0 +1,93 @@ +package org.apache.carbondata.integration.spark.testsuite.timeseries + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { + + override def beforeAll: Unit = { + sql("drop table if exists mainTable") + sql("CREATE TABLE mainTable(dataTime timestamp, name string, city string, age int) STORED BY 'org.apache.carbondata.format'") + sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='second=1,hour=1,day=1,month=1,year=1') as select dataTime, sum(age) from mainTable group by dataTime") + } + + test("test timeseries create table Zero") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_second"), true, "maintable_agg0_second") + sql("drop datamap agg0_second on table mainTable") + } + + test("test timeseries create table One") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_hour"), true, "maintable_agg0_hour") + sql("drop datamap agg0_hour on table mainTable") + } + test("test timeseries create table two") { + checkExistence(sql("DESCRIBE FORMATTED maintable_agg0_day"), true, "maintable_agg0_day") + sql("drop datamap agg0_day on table mainTable") + } + test("test timeseries create table three") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_month"), true, "maintable_agg0_month") + sql("drop datamap agg0_month on table mainTable") + } + test("test timeseries create table four") { + checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_year"), true, "maintable_agg0_year") + sql("drop datamap agg0_year on table mainTable") + } + + test("test timeseries create table five") { + try { + sql( + "create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='sec=1,hour=1,day=1,month=1,year=1') as select dataTime, sum(age) from mainTable group by dataTime") + assert(false) + } catch { + case _:Exception => + assert(true) + } + } + + test("test timeseries create table Six") { + try { + sql( + "create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='hour=2') as select dataTime, sum(age) from mainTable group by dataTime") + assert(false) + } catch { + case _:Exception => + assert(true) + } + } + + test("test timeseries create table seven") { + try { + sql( + "create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='hour=1,day=1,year=1,month=1') as select dataTime, sum(age) from mainTable group by dataTime") + assert(false) + } catch { + case _:Exception => + assert(true) + } + } + + test("test timeseries create table Eight") { + try { + sql( + "create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='name', 'timeseries.hierarchy'='hour=1,day=1,year=1,month=1') as select name, sum(age) from mainTable group by name") + assert(false) + } catch { + case _:Exception => + assert(true) + } + } + + test("test timeseries create table Nine") { + try { + sql( + "create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='hour=1,day=1,year=1,month=1') as select name, sum(age) from mainTable group by name") + assert(false) + } catch { + case _:Exception => + assert(true) + } + } + override def afterAll: Unit = { + sql("drop table if exists mainTable") + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 44f577d..37663ea 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -76,7 +76,7 @@ case class Field(column: String, var dataType: Option[String], name: Option[Stri override def hashCode : Int = column.hashCode } -case class DataMapField(aggregateFunction: String = "", +case class DataMapField(var aggregateFunction: String = "", columnTableRelation: Option[ColumnTableRelation] = None) { } http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 53b20c2..d68bc41 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql +import java.sql.Timestamp import java.util.concurrent.ConcurrentHashMap import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil import org.apache.spark.sql.hive._ import org.apache.carbondata.common.logging.LogServiceFactory @@ -61,6 +63,10 @@ class CarbonEnv { // only then the CarbonPreAggregateDataLoadingRules would be applied to split the average // column to sum and count. sparkSession.udf.register("preAggLoad", () => "") + + // added for handling timeseries function like hour, minute, day , month , year + sparkSession.udf.register("timeseries", (timestamp: Timestamp, timeSeriesFunction: String) => + TimeSeriesUtil.timeSeriesUDF(timestamp, timeSeriesFunction)) synchronized { if (!initialized) { // update carbon session parameters , preserve thread parameters http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index f90abb8..a3aa36d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -22,8 +22,10 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil} +import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.DataMapSchema import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -50,13 +52,30 @@ case class CarbonCreateDataMapCommand( val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") || dmClassName.equalsIgnoreCase("preaggregate")) { - CreatePreAggregateTableCommand( - dataMapName, - tableIdentifier, - dmClassName, - dmproperties, - queryString.get - ).processMetadata(sparkSession) + val timeHierarchyString = dmproperties.get(CarbonCommonConstants.TIMESERIES_HIERARCHY) + if (timeHierarchyString.isDefined) { + val details = TimeSeriesUtil + .validateAndGetTimeSeriesHierarchyDetails( + timeHierarchyString.get) + val updatedDmProperties = dmproperties - CarbonCommonConstants.TIMESERIES_HIERARCHY + details.foreach { f => + CreatePreAggregateTableCommand(dataMapName + '_' + f._1, + tableIdentifier, + dmClassName, + updatedDmProperties, + queryString.get, + Some(f._1)).run(sparkSession) + } + } + else { + CreatePreAggregateTableCommand( + dataMapName, + tableIdentifier, + dmClassName, + dmproperties, + queryString.get + ).processMetadata(sparkSession) + } } else { val dataMapSchema = new DataMapSchema(dataMapName, dmClassName) dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index 1ebf511..1c23d3a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand +import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -44,7 +45,8 @@ case class CreatePreAggregateTableCommand( parentTableIdentifier: TableIdentifier, dmClassName: String, dmProperties: Map[String, String], - queryString: String) + queryString: String, + timeSeriesFunction: Option[String] = None) extends AtomicRunnableCommand { override def processMetadata(sparkSession: SparkSession): Seq[Row] = { @@ -74,6 +76,15 @@ case class CreatePreAggregateTableCommand( // updating the relation identifier, this will be stored in child table // which can be used during dropping of pre-aggreate table as parent table will // also get updated + if(timeSeriesFunction.isDefined) { + TimeSeriesUtil.validateTimeSeriesEventTime(dmProperties, parentTable) + TimeSeriesUtil.validateEventTimeColumnExitsInSelect( + fieldRelationMap, + dmProperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME).get) + TimeSeriesUtil.updateTimeColumnSelect(fieldRelationMap, + dmProperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME).get, + timeSeriesFunction.get) + } tableModel.parentTable = Some(parentTable) tableModel.dataMapRelation = Some(fieldRelationMap) val tablePath = http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 95a711e..c602b0a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -237,7 +237,7 @@ object PreAggregateUtil { parentTableName, parentDatabaseName, parentTableId = parentTableId) case Average(attr: AttributeReference) => - getField(attr.name, + list += getField(attr.name, attr.dataType, "sum", carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, http://git-wip-us.apache.org/repos/asf/carbondata/blob/49763b72/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala new file mode 100644 index 0000000..9d4ce56 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.timeseries + +import java.sql.Timestamp + +import org.apache.spark.sql.execution.command.{DataMapField, Field} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.preagg.TimeSeriesUDF +import org.apache.carbondata.spark.exception.MalformedCarbonCommandException + +/** + * Utility class for time series to keep + */ +object TimeSeriesUtil { + + /** + * Below method will be used to validate whether column mentioned in time series + * is timestamp column or not + * + * @param dmproperties + * data map properties + * @param parentTable + * parent table + * @return whether time stamp column + */ + def validateTimeSeriesEventTime(dmproperties: Map[String, String], + parentTable: CarbonTable) { + val eventTime = dmproperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME) + if (!eventTime.isDefined) { + throw new MalformedCarbonCommandException("Eventtime not defined in time series") + } else { + val carbonColumn = parentTable.getColumnByName(parentTable.getTableName, eventTime.get) + if (carbonColumn.getDataType != DataTypes.TIMESTAMP) { + throw new MalformedCarbonCommandException( + "Timeseries event time is only supported on Timestamp " + + "column") + } + } + } + + /** + * Below method will be used to validate the hierarchy of time series and its value + * validation will be done whether hierarchy order is proper or not and hierarchy level + * value + * + * @param timeSeriesHierarchyDetails + * time series hierarchy string + */ + def validateAndGetTimeSeriesHierarchyDetails(timeSeriesHierarchyDetails: String): Array[ + (String, String)] = { + val updatedtimeSeriesHierarchyDetails = timeSeriesHierarchyDetails.toLowerCase + val timeSeriesHierarchy = updatedtimeSeriesHierarchyDetails.split(",") + val hierBuffer = timeSeriesHierarchy.map { + case f => + val splits = f.split("=") + // checking hierarchy name is valid or not + if (!TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION.contains(splits(0).toLowerCase)) { + throw new MalformedCarbonCommandException(s"Not supported heirarchy type: ${ splits(0) }") + + } + // validating hierarchy level is valid or not + if (!splits(1).equals("1")) { + throw new MalformedCarbonCommandException( + s"Unsupported Value for hierarchy:" + + s"${ splits(0) }=${ splits(1) }") + } + (splits(0), splits(1)) + } + // checking whether hierarchy is in proper order or not + // get the index of first hierarchy + val indexOfFirstHierarchy = TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION + .indexOf(hierBuffer(0)._1.toLowerCase) + val index = 0 + // now iterating through complete hierarchy to check any of the hierarchy index + // is less than first one + for (index <- 1 to hierBuffer.size - 1) { + val currentIndex = TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION + .indexOf(hierBuffer(index)._1.toLowerCase) + if (currentIndex < indexOfFirstHierarchy) { + throw new MalformedCarbonCommandException(s"$timeSeriesHierarchyDetails is in wrong order") + } + } + hierBuffer + } + + /** + * Below method will be used to validate whether timeseries column present in + * select statement or not + * @param fieldMapping + * fields from select plan + * @param timeSeriesColumn + * timeseries column name + */ + def validateEventTimeColumnExitsInSelect(fieldMapping: scala.collection.mutable + .LinkedHashMap[Field, DataMapField], + timeSeriesColumn: String) : Any = { + val isTimeSeriesColumnExits = fieldMapping + .exists(obj => obj._2.columnTableRelation.isDefined && + obj._2.columnTableRelation.get.parentColumnName + .equalsIgnoreCase(timeSeriesColumn) && + obj._2.aggregateFunction.isEmpty) + if(!isTimeSeriesColumnExits) { + throw new MalformedCarbonCommandException(s"Time series column ${ timeSeriesColumn } does " + + s"not exists in select") + } + } + + /** + * Below method will be used to validate whether timeseries column present in + * select statement or not + * @param fieldMapping + * fields from select plan + * @param timeSeriesColumn + * timeseries column name + */ + def updateTimeColumnSelect(fieldMapping: scala.collection.mutable + .LinkedHashMap[Field, DataMapField], + timeSeriesColumn: String, + timeSeriesFunction: String) : Any = { + val isTimeSeriesColumnExits = fieldMapping + .find(obj => obj._2.columnTableRelation.isDefined && + obj._2.columnTableRelation.get.parentColumnName + .equalsIgnoreCase(timeSeriesColumn) && + obj._2.aggregateFunction.isEmpty) + isTimeSeriesColumnExits.get._2.aggregateFunction = timeSeriesFunction + } + + /** + * UDF for timeseries + * + * @param timestamp + * timestamp + * @param timeSeriesFunctionType + * time series function + * @return updated timestamp based on function + */ + def timeSeriesUDF(timestamp: Timestamp, timeSeriesFunctionType: String): Timestamp = { + TimeSeriesUDF.INSTANCE.applyUDF(timestamp, timeSeriesFunctionType) + } +} +
