[CARBONDATA-1519][PreAgg-Timeseries] Support Query and Load on timeseries table
This closes #1626 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e2a79eeb Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e2a79eeb Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e2a79eeb Branch: refs/heads/master Commit: e2a79eebbcbe641f657e84ccf86a9a7e84e8e735 Parents: 0da0a4f Author: kumarvishal <[email protected]> Authored: Tue Dec 5 20:56:58 2017 +0530 Committer: ravipesala <[email protected]> Committed: Fri Dec 8 04:09:25 2017 +0530 ---------------------------------------------------------------------- .../schema/table/AggregationDataMapSchema.java | 129 +++++++++- .../core/preagg/AggregateTableSelector.java | 31 ++- .../carbondata/core/preagg/QueryColumn.java | 20 +- .../core/preagg/TimeSeriesFunction.java | 40 ---- .../core/preagg/TimeSeriesFunctionEnum.java | 53 +++++ .../carbondata/core/preagg/TimeSeriesUDF.java | 5 +- .../apache/carbondata/core/util/CarbonUtil.java | 32 +++ .../src/test/resources/timeseriestest.csv | 7 + .../TestPreAggregateTableSelection.scala | 1 - .../timeseries/TestTimeSeriesCreateTable.scala | 18 +- .../timeseries/TestTimeseriesDataLoad.scala | 79 +++++++ .../TestTimeseriesTableSelection.scala | 131 +++++++++++ .../carbondata/spark/util/CarbonSparkUtil.scala | 7 +- .../scala/org/apache/spark/sql/CarbonEnv.scala | 6 +- .../apache/spark/sql/CarbonExpressions.scala | 16 +- .../preaaggregate/PreAggregateListeners.scala | 65 +++-- .../preaaggregate/PreAggregateUtil.scala | 105 ++++++++- .../command/timeseries/TimeSeriesFunction.scala | 33 +++ .../command/timeseries/TimeseriesUtil.scala | 15 -- .../spark/sql/hive/CarbonFileMetastore.scala | 5 +- .../sql/hive/CarbonPreAggregateRules.scala | 235 ++++++++++++++----- .../src/main/spark2.1/CarbonSessionState.scala | 4 +- .../src/main/spark2.2/CarbonSessionState.scala | 4 +- 23 files changed, 885 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java index 9bfb22c..8f6a2d3 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation; +import org.apache.carbondata.core.preagg.TimeSeriesFunctionEnum; /** * data map schema class for pre aggregation @@ -50,6 +51,17 @@ public class AggregationDataMapSchema extends DataMapSchema { */ private Map<String, Set<String>> parentColumnToAggregationsMapping; + /** + * whether its a timeseries data map + */ + private boolean isTimeseriesDataMap; + + /** + * below ordinal will be used during sorting the data map + * to support rollup for loading + */ + private int ordinal = Integer.MAX_VALUE; + public AggregationDataMapSchema(String dataMapName, String className) { super(dataMapName, className); } @@ -63,6 +75,28 @@ public class AggregationDataMapSchema extends DataMapSchema { } /** + * Below method will be used to get the columns on which aggregate function + * and time series function is not applied + * @param columnName + * parent column name + * @return child column schema + */ + public ColumnSchema getNonAggNonTimeseriesChildColBasedByParent(String columnName) { + Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName); + if (null != columnSchemas) { + Iterator<ColumnSchema> iterator = columnSchemas.iterator(); + while (iterator.hasNext()) { + ColumnSchema next = iterator.next(); + if ((null == next.getAggFunction() || next.getAggFunction().isEmpty()) && null == next + .getTimeSeriesFunction() || next.getTimeSeriesFunction().isEmpty()) { + return next; + } + } + } + return null; + } + + /** * Below method will be used to get the columns on which aggregate function is not applied * @param columnName * parent column name @@ -74,7 +108,28 @@ public class AggregationDataMapSchema extends DataMapSchema { Iterator<ColumnSchema> iterator = columnSchemas.iterator(); while (iterator.hasNext()) { ColumnSchema next = iterator.next(); - if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) { + if ((null == next.getAggFunction() || next.getAggFunction().isEmpty())) { + return next; + } + } + } + return null; + } + + /** + * Below method will be used to get the columns on which aggregate function is not applied + * + * @param columnName parent column name + * @return child column schema + */ + public ColumnSchema getTimeseriesChildColBasedByParent(String columnName, + String timeseriesFunction) { + Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName); + if (null != columnSchemas) { + Iterator<ColumnSchema> iterator = columnSchemas.iterator(); + while (iterator.hasNext()) { + ColumnSchema next = iterator.next(); + if (timeseriesFunction.equals(next.getTimeSeriesFunction())) { return next; } } @@ -126,6 +181,28 @@ public class AggregationDataMapSchema extends DataMapSchema { } /** + * Below method will be used to get the column schema based on parent column name + * @param columName + * parent column name + * @param timeseriesFunction + * timeseries function applied on column + * @return child column schema + */ + public ColumnSchema getTimeseriesChildColByParent(String columName, String timeseriesFunction) { + List<ColumnSchema> listOfColumns = childSchema.getListOfColumns(); + for (ColumnSchema columnSchema : listOfColumns) { + List<ParentColumnTableRelation> parentColumnTableRelations = + columnSchema.getParentColumnTableRelations(); + if (null != parentColumnTableRelations && parentColumnTableRelations.size() == 1 + && parentColumnTableRelations.get(0).getColumnName().equals(columName) + && timeseriesFunction.equalsIgnoreCase(columnSchema.getTimeSeriesFunction())) { + return columnSchema; + } + } + return null; + } + + /** * Below method is to check if parent column with matching aggregate function * @param parentColumnName * parent column name @@ -175,6 +252,15 @@ public class AggregationDataMapSchema extends DataMapSchema { private void fillNonAggFunctionColumns(List<ColumnSchema> listOfColumns) { parentToNonAggChildMapping = new HashMap<>(); for (ColumnSchema column : listOfColumns) { + if (!isTimeseriesDataMap) { + isTimeseriesDataMap = + null != column.getTimeSeriesFunction() && !column.getTimeSeriesFunction().isEmpty(); + if (isTimeseriesDataMap) { + this.ordinal = + TimeSeriesFunctionEnum.valueOf(column.getTimeSeriesFunction().toUpperCase()) + .getOrdinal(); + } + } if (null == column.getAggFunction() || column.getAggFunction().isEmpty()) { fillMappingDetails(column, parentToNonAggChildMapping); } @@ -210,4 +296,45 @@ public class AggregationDataMapSchema extends DataMapSchema { } } + public boolean isTimeseriesDataMap() { + return isTimeseriesDataMap; + } + + /** + * Below method is to support rollup during loading the data in pre aggregate table + * In case of timeseries year level table data loading can be done using month level table or any + * time series level below year level for example day,hour minute, second. + * @TODO need to handle for pre aggregate table without timeseries + * + * @param aggregationDataMapSchema + * @return whether aggregation data map can be selected or not + */ + public boolean canSelectForRollup(AggregationDataMapSchema aggregationDataMapSchema) { + List<ColumnSchema> listOfColumns = childSchema.getListOfColumns(); + for (ColumnSchema column : listOfColumns) { + List<ParentColumnTableRelation> parentColumnTableRelations = + column.getParentColumnTableRelations(); + //@TODO handle scenario when aggregate datamap columns is derive from multiple column + // which is not supported currently + if (null != parentColumnTableRelations && parentColumnTableRelations.size() == 1) { + if (null != column.getAggFunction() && !column.getAggFunction().isEmpty()) { + if (null == aggregationDataMapSchema + .getAggChildColByParent(parentColumnTableRelations.get(0).getColumnName(), + column.getAggFunction())) { + return false; + } + } else { + if (null == aggregationDataMapSchema.getNonAggChildColBasedByParent( + parentColumnTableRelations.get(0).getColumnName())) { + return false; + } + } + } + } + return true; + } + + public int getOrdinal() { + return ordinal; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java index 8b87a1a..5347567 100644 --- a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java +++ b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java @@ -70,8 +70,8 @@ public class AggregateTableSelector { AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema; isMatch = true; for (QueryColumn queryColumn : projectionColumn) { - ColumnSchema columnSchemaByParentName = aggregationDataMapSchema - .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName()); + ColumnSchema columnSchemaByParentName = + getColumnSchema(queryColumn, aggregationDataMapSchema); if (null == columnSchemaByParentName) { isMatch = false; } @@ -95,8 +95,8 @@ public class AggregateTableSelector { isMatch = true; for (QueryColumn queryColumn : filterColumns) { AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema; - ColumnSchema columnSchemaByParentName = aggregationDataMapSchema - .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName()); + ColumnSchema columnSchemaByParentName = + getColumnSchema(queryColumn, aggregationDataMapSchema); if (null == columnSchemaByParentName) { isMatch = false; } @@ -132,4 +132,27 @@ public class AggregateTableSelector { } return selectedDataMapSchema; } + + /** + * Below method will be used to get column schema for projection and + * filter query column + * + * @param queryColumn query column + * @param aggregationDataMapSchema selected data map schema + * @return column schema + */ + private ColumnSchema getColumnSchema(QueryColumn queryColumn, + AggregationDataMapSchema aggregationDataMapSchema) { + ColumnSchema columnSchemaByParentName = null; + if (!queryColumn.getTimeseriesFunction().isEmpty()) { + columnSchemaByParentName = aggregationDataMapSchema + .getTimeseriesChildColBasedByParent(queryColumn.getColumnSchema().getColumnName(), + queryColumn.getTimeseriesFunction()); + } else { + columnSchemaByParentName = aggregationDataMapSchema + .getNonAggNonTimeseriesChildColBasedByParent( + queryColumn.getColumnSchema().getColumnName()); + } + return columnSchemaByParentName; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java index c889716..c91a703 100644 --- a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java +++ b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java @@ -44,12 +44,18 @@ public class QueryColumn { */ private boolean isFilterColumn; + /** + * timeseries udf applied on column + */ + private String timeseriesFunction; + public QueryColumn(ColumnSchema columnSchema, String changedDataType, String aggFunction, - boolean isFilterColumn) { + boolean isFilterColumn, String timeseriesFunction) { this.columnSchema = columnSchema; this.changedDataType = changedDataType; this.aggFunction = aggFunction; this.isFilterColumn = isFilterColumn; + this.timeseriesFunction = timeseriesFunction; } public ColumnSchema getColumnSchema() { @@ -68,6 +74,10 @@ public class QueryColumn { return isFilterColumn; } + public String getTimeseriesFunction() { + return timeseriesFunction; + } + @Override public boolean equals(Object o) { if (this == o) { return true; @@ -82,12 +92,18 @@ public class QueryColumn { if (!columnSchema.equals(that.columnSchema)) { return false; } - return aggFunction != null ? aggFunction.equals(that.aggFunction) : that.aggFunction == null; + if (!(aggFunction != null ? aggFunction.equals(that.aggFunction) : that.aggFunction == null)) { + return false; + } + return timeseriesFunction != null ? + timeseriesFunction.equals(that.timeseriesFunction) : + that.timeseriesFunction == null; } @Override public int hashCode() { int result = columnSchema.hashCode(); result = 31 * result + (aggFunction != null ? aggFunction.hashCode() : 0); + result = 31 * result + (timeseriesFunction != null ? timeseriesFunction.hashCode() : 0); result = 31 * result + (isFilterColumn ? 1 : 0); return result; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java deleted file mode 100644 index 02ff753..0000000 --- a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunction.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.preagg; - -/** - * enum for timeseries function - */ -public enum TimeSeriesFunction { - SECOND("second"), - MINUTE("minute"), - HOUR("hour"), - DAY("day"), - MONTH("month"), - YEAR("year"); - - private String name; - - TimeSeriesFunction(String name) { - this.name = name; - } - - public String getName() { - return name; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java new file mode 100644 index 0000000..5d0d2af --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesFunctionEnum.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.preagg; + +/** + * enum for timeseries function + */ +public enum TimeSeriesFunctionEnum { + SECOND("second", 0), + MINUTE("minute", 1), + HOUR("hour", 2), + DAY("day", 3), + MONTH("month", 4), + YEAR("year", 5); + + /** + * name of the function + */ + private String name; + + /** + * ordinal for function + */ + private int ordinal; + + TimeSeriesFunctionEnum(String name, int ordinal) { + this.name = name; + this.ordinal = ordinal; + } + + public String getName() { + return name; + } + + public int getOrdinal() { + return ordinal; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java index 50cb052..3aa4190 100644 --- a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java +++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java @@ -66,8 +66,9 @@ public class TimeSeriesUDF { Calendar calendar = calanderThreadLocal.get(); calendar.clear(); calendar.setTimeInMillis(data.getTime()); - TimeSeriesFunction timeSeriesFunction = TimeSeriesFunction.valueOf(function); - switch (timeSeriesFunction) { + TimeSeriesFunctionEnum timeSeriesFunctionEnum = + TimeSeriesFunctionEnum.valueOf(function.toUpperCase()); + switch (timeSeriesFunctionEnum) { case SECOND: calendar.set(Calendar.MILLISECOND, 0); break; http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index ab85684..148098d 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -67,7 +67,9 @@ import org.apache.carbondata.core.metadata.blocklet.SegmentInfo; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; @@ -2297,5 +2299,35 @@ public final class CarbonUtil { return dataAndIndexSize; } + /** + * Utility function to check whether table has timseries datamap or not + * @param carbonTable + * @return timeseries data map present + */ + public static boolean hasTimeSeriesDataMap(CarbonTable carbonTable) { + List<DataMapSchema> dataMapSchemaList = carbonTable.getTableInfo().getDataMapSchemaList(); + for (DataMapSchema dataMapSchema : dataMapSchemaList) { + if (dataMapSchema instanceof AggregationDataMapSchema) { + return ((AggregationDataMapSchema) dataMapSchema).isTimeseriesDataMap(); + } + } + return false; + } + + /** + * Utility function to check whether table has aggregation datamap or not + * @param carbonTable + * @return timeseries data map present + */ + public static boolean hasAggregationDataMap(CarbonTable carbonTable) { + List<DataMapSchema> dataMapSchemaList = carbonTable.getTableInfo().getDataMapSchemaList(); + for (DataMapSchema dataMapSchema : dataMapSchemaList) { + if (dataMapSchema instanceof AggregationDataMapSchema) { + return true; + } + } + return false; + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark-common-test/src/test/resources/timeseriestest.csv ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/resources/timeseriestest.csv b/integration/spark-common-test/src/test/resources/timeseriestest.csv new file mode 100644 index 0000000..1674ac9 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/timeseriestest.csv @@ -0,0 +1,7 @@ +mytime,name,age +2016-2-23 01:01:30,vishal,10 +2016-2-23 01:01:40,kunal,20 +2016-2-23 01:01:50,shahid,30 +2016-2-23 01:02:30,kk,40 +2016-2-23 01:02:40,rahul,50 +2016-2-23 01:02:50,ravi,50 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala index dc117a5..d84ec3b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala @@ -16,7 +16,6 @@ */ package org.apache.carbondata.integration.spark.testsuite.preaggregate -import org.apache.spark.sql.catalyst.catalog.CatalogRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.CarbonRelation http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala index b60e487..5cbcb26 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala @@ -1,7 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.carbondata.integration.spark.testsuite.timeseries import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfterAll, Ignore} class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala new file mode 100644 index 0000000..217edea --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.integration.spark.testsuite.timeseries + +import java.sql.Timestamp + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, Ignore} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +@Ignore +class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll { + + override def beforeAll: Unit = { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + sql("drop table if exists mainTable") + sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED BY 'org.apache.carbondata.format'") + sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='mytime', 'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select mytime, sum(age) from mainTable group by mytime") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable") + } + + test("test Year level timeseries data validation") { + checkAnswer( sql("select * from maintable_agg0_year"), + Seq(Row(Timestamp.valueOf("2016-01-01 00:00:00.0"),200))) + } + + test("test month level timeseries data validation") { + checkAnswer( sql("select * from maintable_agg0_month"), + Seq(Row(Timestamp.valueOf("2016-02-01 00:00:00.0"),200))) + } + + test("test day level timeseries data validation") { + checkAnswer( sql("select * from maintable_agg0_day"), + Seq(Row(Timestamp.valueOf("2016-02-23 00:00:00.0"),200))) + } + + test("test hour level timeseries data validation") { + checkAnswer( sql("select * from maintable_agg0_hour"), + Seq(Row(Timestamp.valueOf("2016-02-23 01:00:00.0"),200))) + } + + test("test minute level timeseries data validation") { + checkAnswer( sql("select * from maintable_agg0_minute"), + Seq(Row(Timestamp.valueOf("2016-02-23 01:01:00.0"),60), + Row(Timestamp.valueOf("2016-02-23 01:02:00.0"),140))) + } + + test("test second level timeseries data validation") { + checkAnswer( sql("select * from maintable_agg0_second"), + Seq(Row(Timestamp.valueOf("2016-02-23 01:01:30.0"),10), + Row(Timestamp.valueOf("2016-02-23 01:01:40.0"),20), + Row(Timestamp.valueOf("2016-02-23 01:01:50.0"),30), + Row(Timestamp.valueOf("2016-02-23 01:02:30.0"),40), + Row(Timestamp.valueOf("2016-02-23 01:02:40.0"),50), + Row(Timestamp.valueOf("2016-02-23 01:02:50.0"),50))) + } + + override def afterAll: Unit = { + sql("drop table if exists mainTable") + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala new file mode 100644 index 0000000..0990f87 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.integration.spark.testsuite.timeseries + +import org.apache.spark.sql.CarbonDatasourceHadoopRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + + +class TestTimeseriesTableSelection extends QueryTest with BeforeAndAfterAll { + + override def beforeAll: Unit = { + sql("drop table if exists mainTable") + sql("CREATE TABLE mainTable(dataTime timestamp, name string, city string, age int) STORED BY 'org.apache.carbondata.format'") + sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select dataTime, sum(age) from mainTable group by dataTime") + } + test("test PreAggregate table selection 1") { + val df = sql("select dataTime from mainTable group by dataTime") + preAggTableValidator(df.queryExecution.analyzed, "maintable") + } + + test("test PreAggregate table selection 2") { + val df = sql("select timeseries(dataTime,'hour') from mainTable group by timeseries(dataTime,'hour')") + preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0_hour") + } + + test("test PreAggregate table selection 3") { + val df = sql("select timeseries(dataTime,'milli') from mainTable group by timeseries(dataTime,'milli')") + preAggTableValidator(df.queryExecution.analyzed, "maintable") + } + + test("test PreAggregate table selection 4") { + val df = sql("select timeseries(dataTime,'year') from mainTable group by timeseries(dataTime,'year')") + preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_year") + } + + test("test PreAggregate table selection 5") { + val df = sql("select timeseries(dataTime,'day') from mainTable group by timeseries(dataTime,'day')") + preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_day") + } + + test("test PreAggregate table selection 6") { + val df = sql("select timeseries(dataTime,'month') from mainTable group by timeseries(dataTime,'month')") + preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_month") + } + + test("test PreAggregate table selection 7") { + val df = sql("select timeseries(dataTime,'minute') from mainTable group by timeseries(dataTime,'minute')") + preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_minute") + } + + test("test PreAggregate table selection 8") { + val df = sql("select timeseries(dataTime,'second') from mainTable group by timeseries(dataTime,'second')") + preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_second") + } + + test("test PreAggregate table selection 9") { + val df = sql("select timeseries(dataTime,'hour') from mainTable where timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour')") + preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour") + } + + test("test PreAggregate table selection 10") { + val df = sql("select timeseries(dataTime,'hour') from mainTable where timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')") + preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour") + } + + test("test PreAggregate table selection 11") { + val df = sql("select timeseries(dataTime,'hour'),sum(age) from mainTable where timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')") + preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour") + } + + test("test PreAggregate table selection 12") { + val df = sql("select timeseries(dataTime,'hour')as hourlevel,sum(age) as sum from mainTable where timeseries(dataTime,'hour')='x' group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')") + preAggTableValidator(df.queryExecution.analyzed,"maintable_agg0_hour") + } + + test("test PreAggregate table selection 13") { + val df = sql("select timeseries(dataTime,'hour')as hourlevel,sum(age) as sum from mainTable where timeseries(dataTime,'hour')='x' and name='vishal' group by timeseries(dataTime,'hour') order by timeseries(dataTime,'hour')") + preAggTableValidator(df.queryExecution.analyzed,"maintable") + } + + def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={ + var isValidPlan = false + plan.transform { + // first check if any preaTable1 scala function is applied it is present is in plan + // then call is from create preaTable1regate table class so no need to transform the query plan + case ca:CarbonRelation => + if (ca.isInstanceOf[CarbonDatasourceHadoopRelation]) { + val relation = ca.asInstanceOf[CarbonDatasourceHadoopRelation] + if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) { + isValidPlan = true + } + } + ca + case logicalRelation:LogicalRelation => + if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) { + val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation] + if(relation.carbonTable.getTableName.equalsIgnoreCase(actualTableName)) { + isValidPlan = true + } + } + logicalRelation + } + if(!isValidPlan) { + assert(false) + } else { + assert(true) + } + } + + override def afterAll: Unit = { + sql("drop table if exists mainTable") + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala index 7cc3d11..5f78397 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala @@ -25,6 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn +import org.apache.carbondata.core.util.CarbonUtil case class TransformHolder(rdd: Any, mataData: CarbonMetaData) @@ -41,7 +42,11 @@ object CarbonSparkUtil { f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) && !f.getDataType.isComplexType) } - CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap)) + CarbonMetaData(dimensionsAttr, + measureAttr, + carbonTable, + DictionaryMap(dictionary.toMap), + CarbonUtil.hasAggregationDataMap(carbonTable)) } def createCarbonRelation(tableInfo: TableInfo, tablePath: String): CarbonRelation = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index d68bc41..6317177 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -17,12 +17,11 @@ package org.apache.spark.sql -import java.sql.Timestamp import java.util.concurrent.ConcurrentHashMap import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil +import org.apache.spark.sql.execution.command.timeseries.{TimeSeriesFunction} import org.apache.spark.sql.hive._ import org.apache.carbondata.common.logging.LogServiceFactory @@ -65,8 +64,7 @@ class CarbonEnv { sparkSession.udf.register("preAggLoad", () => "") // added for handling timeseries function like hour, minute, day , month , year - sparkSession.udf.register("timeseries", (timestamp: Timestamp, timeSeriesFunction: String) => - TimeSeriesUtil.timeSeriesUDF(timestamp, timeSeriesFunction)) + sparkSession.udf.register("timeseries", new TimeSeriesFunction) synchronized { if (!initialized) { // update carbon session parameters , preserve thread parameters http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala index 8e157fd..c1f9e8a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression, ScalaUDF} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.command.DescribeTableCommand import org.apache.spark.sql.types.DataType @@ -84,4 +84,18 @@ object CarbonExpressions { } } } + + /** + * unapply method of Scala UDF + */ + object CarbonScalaUDF { + def unapply(expression: Expression): Option[(ScalaUDF)] = { + expression match { + case a: ScalaUDF => + Some(a) + case _ => + None + } + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index 4315e05..747e447 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -18,19 +18,15 @@ package org.apache.spark.sql.execution.command.preaaggregate import scala.collection.JavaConverters._ +import scala.collection.mutable -import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand} -import org.apache.spark.sql.execution.command.AlterTableModel -import org.apache.spark.sql.CarbonSession import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand} +import org.apache.spark.sql.execution.command.AlterTableModel -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.schema.table.DataMapSchema -import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema} +import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.events._ -import org.apache.carbondata.processing.loading.model.CarbonLoadModel object LoadPostAggregateListener extends OperationEventListener { /** @@ -43,20 +39,47 @@ object LoadPostAggregateListener extends OperationEventListener { val sparkSession = loadEvent.sparkSession val carbonLoadModel = loadEvent.carbonLoadModel val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - if (table.hasDataMapSchema) { - for (dataMapSchema: DataMapSchema <- table.getTableInfo.getDataMapSchemaList.asScala) { + if (CarbonUtil.hasAggregationDataMap(table)) { + // getting all the aggergate datamap schema + val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala + .filter(_.isInstanceOf[AggregationDataMapSchema]) + .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]] + // sorting the datamap for timeseries rollup + val sortedList = aggregationDataMapList.sortBy(_.getOrdinal) + val parentTableName = table.getTableName + val databasename = table.getDatabaseName + val list = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema] + for (dataMapSchema: AggregationDataMapSchema <- sortedList) { val childTableName = dataMapSchema.getRelationIdentifier.getTableName val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName + val childSelectQuery = if (!dataMapSchema.isTimeseriesDataMap) { + dataMapSchema.getProperties.get("CHILD_SELECT QUERY") + } else { + // for timeseries rollup policy + val tableSelectedForRollup = PreAggregateUtil.getRollupDataMapNameForTimeSeries(list, + dataMapSchema) + // if non of the rollup data map is selected hit the maintable and prepare query + if (tableSelectedForRollup.isEmpty) { + PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema, + parentTableName, + databasename) + } else { + // otherwise hit the select rollup datamap schema + PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema, + tableSelectedForRollup.get, + databasename) + } + } PreAggregateUtil.startDataLoadForDataMap( table, TableIdentifier(childTableName, Some(childDatabaseName)), - dataMapSchema.getProperties.get("CHILD_SELECT QUERY"), + childSelectQuery, carbonLoadModel.getSegmentId, validateSegments = false, sparkSession) + } } } - } } /** @@ -74,7 +97,7 @@ object AlterPreAggregateTableCompactionPostListener extends OperationEventListen val carbonTable = compactionEvent.carbonTable val compactionType = compactionEvent.carbonMergerMapping.campactionType val sparkSession = compactionEvent.sparkSession - if (carbonTable.hasDataMapSchema) { + if (CarbonUtil.hasAggregationDataMap(carbonTable)) { carbonTable.getTableInfo.getDataMapSchemaList.asScala.foreach { dataMapSchema => val childRelationIdentifier = dataMapSchema.getRelationIdentifier val alterTableModel = AlterTableModel(Some(childRelationIdentifier.getDatabaseName), @@ -120,7 +143,7 @@ object PreAggregateDataTypeChangePreListener extends OperationEventListener { val carbonTable = dataTypeChangePreListener.carbonTable val alterTableDataTypeChangeModel = dataTypeChangePreListener.alterTableDataTypeChangeModel val columnToBeAltered: String = alterTableDataTypeChangeModel.columnName - if (carbonTable.hasDataMapSchema) { + if (CarbonUtil.hasAggregationDataMap(carbonTable)) { val dataMapSchemas = carbonTable.getTableInfo.getDataMapSchemaList dataMapSchemas.asScala.foreach { dataMapSchema => val childColumns = dataMapSchema.getChildSchema.getListOfColumns @@ -170,7 +193,7 @@ object PreAggregateDeleteSegmentByDatePreListener extends OperationEventListener val deleteSegmentByDatePreEvent = event.asInstanceOf[DeleteSegmentByDatePreEvent] val carbonTable = deleteSegmentByDatePreEvent.carbonTable if (carbonTable != null) { - if (carbonTable.hasDataMapSchema) { + if (CarbonUtil.hasAggregationDataMap(carbonTable)) { throw new UnsupportedOperationException( "Delete segment operation is not supported on tables which have a pre-aggregate table. " + "Drop pre-aggregation table to continue") @@ -194,7 +217,7 @@ object PreAggregateDeleteSegmentByIdPreListener extends OperationEventListener { val tableEvent = event.asInstanceOf[DeleteSegmentByIdPreEvent] val carbonTable = tableEvent.carbonTable if (carbonTable != null) { - if (carbonTable.hasDataMapSchema) { + if (CarbonUtil.hasAggregationDataMap(carbonTable)) { throw new UnsupportedOperationException( "Delete segment operation is not supported on tables which have a pre-aggregate table") } @@ -219,7 +242,7 @@ object PreAggregateDropColumnPreListener extends OperationEventListener { val carbonTable = dataTypeChangePreListener.carbonTable val alterTableDropColumnModel = dataTypeChangePreListener.alterTableDropColumnModel val columnsToBeDropped = alterTableDropColumnModel.columns - if (carbonTable.hasDataMapSchema) { + if (CarbonUtil.hasAggregationDataMap(carbonTable)) { val dataMapSchemas = carbonTable.getTableInfo.getDataMapSchemaList dataMapSchemas.asScala.foreach { dataMapSchema => val parentColumnNames = dataMapSchema.getChildSchema.getListOfColumns.asScala @@ -257,7 +280,7 @@ object PreAggregateRenameTablePreListener extends OperationEventListener { throw new UnsupportedOperationException( "Rename operation for pre-aggregate table is not supported.") } - if (carbonTable.hasDataMapSchema) { + if (CarbonUtil.hasAggregationDataMap(carbonTable)) { throw new UnsupportedOperationException( "Rename operation is not supported for table with pre-aggregate tables") } @@ -275,7 +298,7 @@ object UpdatePreAggregatePreListener extends OperationEventListener { val tableEvent = event.asInstanceOf[UpdateTablePreEvent] val carbonTable = tableEvent.carbonTable if (carbonTable != null) { - if (carbonTable.hasDataMapSchema) { + if (CarbonUtil.hasAggregationDataMap(carbonTable)) { throw new UnsupportedOperationException( "Update operation is not supported for tables which have a pre-aggregate table. Drop " + "pre-aggregate tables to continue.") @@ -299,7 +322,7 @@ object DeletePreAggregatePreListener extends OperationEventListener { val tableEvent = event.asInstanceOf[DeleteFromTablePreEvent] val carbonTable = tableEvent.carbonTable if (carbonTable != null) { - if (carbonTable.hasDataMapSchema) { + if (CarbonUtil.hasAggregationDataMap(carbonTable)) { throw new UnsupportedOperationException( "Delete operation is not supported for tables which have a pre-aggregate table. Drop " + "pre-aggregate tables to continue.") http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 851b851..5ad5308 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -38,7 +38,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, TableSchema} +import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema, TableSchema} import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.format.TableInfo import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -544,8 +544,10 @@ object PreAggregateUtil { def createChildSelectQuery(tableSchema: TableSchema, databaseName: String): String = { val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String] val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String] - tableSchema.getListOfColumns.asScala.foreach { - a => if (a.getAggFunction.nonEmpty) { + val columns = tableSchema.getListOfColumns.asScala + .filter(f => !f.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)) + columns.foreach { a => + if (a.getAggFunction.nonEmpty) { aggregateColumns += s"${a.getAggFunction match { case "count" => "sum" case _ => a.getAggFunction}}(${a.getColumnName})" @@ -558,4 +560,101 @@ object PreAggregateUtil { groupingExpressions.mkString(",") }" } + /** + * Below method will be used to get the select query when rollup policy is + * applied in case of timeseries table + * @param tableSchema + * main data map schema + * @param selectedDataMapSchema + * selected data map schema for rollup + * @return select query based on rolloup + */ + def createTimeseriesSelectQueryForRollup( + tableSchema: TableSchema, + selectedDataMapSchema: AggregationDataMapSchema, + databaseName: String): String = { + val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String] + val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String] + val columns = tableSchema.getListOfColumns.asScala + .filter(f => !f.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)) + columns.foreach { a => + if (a.getAggFunction.nonEmpty) { + aggregateColumns += s"${a.getAggFunction match { + case "count" => "sum" + case others@_ => others}}(${selectedDataMapSchema.getAggChildColByParent( + a.getParentColumnTableRelations.get(0).getColumnName, a.getAggFunction).getColumnName})" + } else if (a.getTimeSeriesFunction.nonEmpty) { + groupingExpressions += s"timeseries(${ + selectedDataMapSchema + .getNonAggChildColBasedByParent(a.getParentColumnTableRelations. + get(0).getColumnName).getColumnName + } , '${ a.getTimeSeriesFunction }')" + } else { + groupingExpressions += selectedDataMapSchema + .getNonAggChildColBasedByParent(a.getParentColumnTableRelations. + get(0).getColumnName).getColumnName + } + } + s"select ${ groupingExpressions.mkString(",") },${ aggregateColumns.mkString(",") + } from $databaseName.${selectedDataMapSchema.getChildSchema.getTableName } " + + s"group by ${ groupingExpressions.mkString(",") }" + } + + /** + * Below method will be used to creating select query for timeseries + * for lowest level for aggergation like second level, in that case it will + * hit the maintable + * @param tableSchema + * data map schema + * @param parentTableName + * parent schema + * @return select query for loading + */ + def createTimeSeriesSelectQueryFromMain(tableSchema: TableSchema, + parentTableName: String, + databaseName: String): String = { + val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String] + val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String] + val columns = tableSchema.getListOfColumns.asScala + .filter(f => !f.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)) + columns.foreach {a => + if (a.getAggFunction.nonEmpty) { + aggregateColumns += + s"${ a.getAggFunction }(${ a.getParentColumnTableRelations.get(0).getColumnName })" + } else if (a.getTimeSeriesFunction.nonEmpty) { + groupingExpressions += + s"timeseries(${ a.getParentColumnTableRelations.get(0).getColumnName },'${ + a.getTimeSeriesFunction}')" + } else { + groupingExpressions += a.getParentColumnTableRelations.get(0).getColumnName + } + } + s"select ${ groupingExpressions.mkString(",") },${ + aggregateColumns.mkString(",") + } from $databaseName.${ parentTableName } group by ${ groupingExpressions.mkString(",") }" + + } + /** + * Below method will be used to select rollup table in case of + * timeseries data map loading + * @param list + * list of timeseries datamap + * @param dataMapSchema + * datamap schema + * @return select table name + */ + def getRollupDataMapNameForTimeSeries( + list: scala.collection.mutable.ListBuffer[AggregationDataMapSchema], + dataMapSchema: AggregationDataMapSchema): Option[AggregationDataMapSchema] = { + if (list.isEmpty) { + None + } else { + val rollupDataMapSchema = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema] + list.foreach{f => + if (dataMapSchema.canSelectForRollup(f)) { + rollupDataMapSchema += f + } } + rollupDataMapSchema.lastOption + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala new file mode 100644 index 0000000..ad9ace7 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesFunction.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.timeseries + +import java.sql.Timestamp + +import org.apache.carbondata.core.preagg.TimeSeriesUDF + +/** + * Time series udf class + */ + +class TimeSeriesFunction extends Function2[Timestamp, String, Timestamp] with Serializable{ + + override def apply(v1: Timestamp, + v2: String): Timestamp = { + TimeSeriesUDF.INSTANCE.applyUDF(v1, v2) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala index 9d4ce56..6a4ef56 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.sql.execution.command.timeseries -import java.sql.Timestamp - import org.apache.spark.sql.execution.command.{DataMapField, Field} import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -142,18 +140,5 @@ object TimeSeriesUtil { obj._2.aggregateFunction.isEmpty) isTimeSeriesColumnExits.get._2.aggregateFunction = timeSeriesFunction } - - /** - * UDF for timeseries - * - * @param timestamp - * timestamp - * @param timeSeriesFunctionType - * time series function - * @return updated timestamp based on function - */ - def timeSeriesUDF(timestamp: Timestamp, timeSeriesFunctionType: String): Timestamp = { - TimeSeriesUDF.INSTANCE.applyUDF(timestamp, timeSeriesFunctionType) - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index abc58ff..f7a1eed 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSes import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.util.CarbonReflectionUtils @@ -61,7 +61,8 @@ case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) { case class CarbonMetaData(dims: Seq[String], msrs: Seq[String], carbonTable: CarbonTable, - dictionaryMap: DictionaryMap) + dictionaryMap: DictionaryMap, + hasAggregateDataMapSchema: Boolean) case class DictionaryMap(dictionaryMap: Map[String, Boolean]) { def get(name: String): Option[Boolean] = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/e2a79eeb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index 09e66de..4227dcb 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -22,22 +22,22 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SPARK_VERSION import org.apache.spark.sql._ -import org.apache.spark.sql.CarbonExpressions.CarbonSubqueryAlias +import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCast} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, NamedExpression, ScalaUDF, SortOrder} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder} import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CarbonException -import org.apache.spark.sql.CarbonExpressions.MatchCast -import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute} import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema} import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan} +import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.spark.util.CarbonScalaUtil /** @@ -69,6 +69,9 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil * 5. Order By Query rules. * 5.1 Update project list based on updated aggregate expression * 5.2 Update sort order attributes based on pre aggregate table + * 6. timeseries function + * 6.1 validate maintable has timeseries datamap + * 6.2 timeseries function is valid function or not * * @param sparkSession * spark session @@ -115,8 +118,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule // only carbon query plan is supported checking whether logical relation is // is for carbon if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable - .hasDataMapSchema => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) // if it is valid plan then extract the query columns isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, @@ -136,8 +139,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule // only carbon query plan is supported checking whether logical relation is // is for carbon if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable - .hasDataMapSchema => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) // if it is valid plan then extract the query columns isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, @@ -148,11 +151,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) // getting the columns from filter expression if(isValidPlan) { - filterExp.transform { - case attr: AttributeReference => - list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true) - attr - } + isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) } carbonTable @@ -162,8 +161,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule // only carbon query plan is supported checking whether logical relation is // is for carbon if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable - .hasDataMapSchema => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) // if it is valid plan then extract the query columns isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, @@ -180,8 +179,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule aggregateExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable - .hasDataMapSchema => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, aggregateExp, @@ -201,8 +200,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule aggregateExp, Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))) if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable - .hasDataMapSchema => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, aggregateExp, @@ -213,11 +212,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule if (isValidPlan) { list ++ extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - filterExp.transform { - case attr: AttributeReference => - list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true) - attr - } + isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) } carbonTable // case for handling aggregation with order by when only projection column exits @@ -227,8 +222,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule aggregateExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable - .hasDataMapSchema => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, aggregateExp, @@ -248,8 +243,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule aggregateExp, Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable - .hasDataMapSchema => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, aggregateExp, @@ -261,11 +256,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders, carbonTable = carbonTable, tableName = tableName) - filterExp.transform { - case attr: AttributeReference => - list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true) - attr - } + isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) } carbonTable case _ => @@ -321,6 +312,65 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } /** + * Below method will be used to extract the query columns from + * filter expression + * @param filterExp + * filter expression + * @param set + * query column list + * @param carbonTable + * parent table + * @param tableName + * table name + * @return isvalid filter expression for aggregate + */ + def extractQueryColumnFromFilterExp(filterExp: Expression, + set: scala.collection.mutable.HashSet[QueryColumn], + carbonTable: CarbonTable, tableName: String): Boolean = { + // map to maintain attribute reference present in the filter to timeseries function + // if applied this is added to avoid duplicate column + val mapOfColumnSeriesFun = scala.collection.mutable.HashMap.empty[AttributeReference, String] + var isValidPlan = true + filterExp.transform { + case attr: AttributeReference => + if (!mapOfColumnSeriesFun.get(attr).isDefined) { + mapOfColumnSeriesFun.put(attr, null) + } + attr + case udf@CarbonScalaUDF(_) => + // for handling timeseries function + if (udf.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase( + "org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction") && + CarbonUtil.hasTimeSeriesDataMap(carbonTable)) { + mapOfColumnSeriesFun.put(udf.children(0).asInstanceOf[AttributeReference], + udf.children(1).asInstanceOf[Literal].value.toString) + } else { + // for any other scala udf + udf.transform { + case attr: AttributeReference => + if (!mapOfColumnSeriesFun.get(attr).isDefined) { + mapOfColumnSeriesFun.put(attr, null) + } + attr + } + } + udf + } + mapOfColumnSeriesFun.foreach { f => + if (f._2 == null) { + set += + getQueryColumn(f._1.name, carbonTable, tableName, isFilterColumn = true) + } else { + set += getQueryColumn(f._1.name, + carbonTable, + carbonTable.getTableName, + isFilterColumn = true, + timeseriesFunction = f._2) + } + } + isValidPlan + } + /** * Below method will be used to extract columns from order by expression * @param projectList * project list from plan @@ -383,13 +433,18 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule attributeReference: AttributeReference, attributes: Seq[AttributeReference], aggFunction: String = "", - canBeNull: Boolean = false): AttributeReference = { + canBeNull: Boolean = false, + timeseriesFunction: String = ""): AttributeReference = { val aggregationDataMapSchema = dataMapSchema.asInstanceOf[AggregationDataMapSchema]; - val columnSchema = if (aggFunction.isEmpty) { + val columnSchema = if (aggFunction.isEmpty && timeseriesFunction.isEmpty) { aggregationDataMapSchema.getChildColByParentColName(attributeReference.name.toLowerCase) - } else { + } else if (!aggFunction.isEmpty) { aggregationDataMapSchema.getAggChildColByParent(attributeReference.name.toLowerCase, - aggFunction.toLowerCase) + aggFunction) + } else { + aggregationDataMapSchema + .getTimeseriesChildColByParent(attributeReference.name.toLowerCase, + timeseriesFunction) } // here column schema cannot be null, if it is null then aggregate table selection // logic has some problem @@ -427,6 +482,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule * 5. Order by plan rules. * 5.1 Update project list based on updated aggregate expression * 5.2 Update sort order attributes based on pre aggregate table + * 6. timeseries function + * 6.1 validate parent table has timeseries datamap + * 6.2 timeseries function is valid function or not * * @param logicalPlan * parent logical plan @@ -444,7 +502,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule // case for aggregation query case Aggregate(grExp, aggExp, child@CarbonSubqueryAlias(_, l: LogicalRelation)) if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => val (updatedGroupExp, updatedAggExp, newChild, None) = getUpdatedExpressions(grExp, aggExp, @@ -461,7 +520,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule aggExp, Filter(expression, child@CarbonSubqueryAlias(_, l: LogicalRelation))) if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = getUpdatedExpressions(grExp, aggExp, @@ -477,7 +537,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule // case for aggregation query case Aggregate(grExp, aggExp, l: LogicalRelation) if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => val (updatedGroupExp, updatedAggExp, newChild, None) = getUpdatedExpressions(grExp, aggExp, @@ -497,7 +558,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule aggregateExp, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))) if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => val (updatedGroupExp, updatedAggExp, newChild, None) = getUpdatedExpressions(groupingExp, aggregateExp, @@ -520,7 +582,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule aggregateExp, Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation))))) if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = getUpdatedExpressions(groupingExp, aggregateExp, @@ -544,8 +607,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule aggregateExp, subQuery@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable - .hasDataMapSchema => + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => val (updatedGroupExp, updatedAggExp, newChild, None) = getUpdatedExpressions(groupingExp, aggregateExp, @@ -566,7 +629,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule aggregateExp, Filter(expression, subQuery@CarbonSubqueryAlias(_, l: LogicalRelation)))) if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema => + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) = getUpdatedExpressions(groupingExp, aggregateExp, @@ -615,6 +679,11 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule alias.qualifier, alias.isGenerated) alias + case alias@Alias(exp: Expression, name) => + updatedProjectList += AttributeReference(name, exp.dataType, exp.nullable)(alias.exprId, + alias.qualifier, + alias.isGenerated) + alias } } // getting the updated sort order @@ -724,6 +793,31 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule Alias(aggExp, name)(NamedExpression.newExprId, alias.qualifier).asInstanceOf[NamedExpression] + case alias@Alias(expression: Expression, name) => + val updatedExp = + if (expression.isInstanceOf[ScalaUDF] && + expression.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase( + "org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction")) { + expression.asInstanceOf[ScalaUDF].transform { + case attr: AttributeReference => + val childAttributeReference = getChildAttributeReference(aggDataMapSchema, + attr, + attributes, + timeseriesFunction = + expression.asInstanceOf[ScalaUDF].children(1).asInstanceOf[Literal].value.toString) + childAttributeReference + } + } else { + expression.transform{ + case attr: AttributeReference => + val childAttributeReference = getChildAttributeReference(aggDataMapSchema, + attr, + attributes) + childAttributeReference + } + } + Alias(updatedExp, name)(NamedExpression.newExprId, + alias.qualifier).asInstanceOf[NamedExpression] } // transformaing the logical relation val newChild = child.transform { @@ -763,11 +857,18 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule // in case of alias we need to match with alias name and when alias is not present // we need to compare with attribute reference name case alias@Alias(attr: AttributeReference, name) - if attr.name.equals(sortOrderAttr.name) || name.equals(sortOrderAttr.name) => + if attr.name.equalsIgnoreCase(sortOrderAttr.name) || + name.equalsIgnoreCase(sortOrderAttr.name) => AttributeReference(name, - attr.dataType, - attr.nullable, - attr.metadata)(alias.exprId, alias.qualifier, alias.isGenerated) + sortOrderAttr.dataType, + sortOrderAttr.nullable, + sortOrderAttr.metadata)(alias.exprId, alias.qualifier, alias.isGenerated) + case alias@Alias(_: Expression, name) + if name.equalsIgnoreCase(sortOrderAttr.name) => + AttributeReference(name, + sortOrderAttr.dataType, + sortOrderAttr.nullable, + sortOrderAttr.metadata)(alias.exprId, alias.qualifier, alias.isGenerated) } // any case it will match the condition, so no need to check whether updated expression is empty // or not @@ -933,7 +1034,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule case attr: AttributeReference => set += getQueryColumn(attr.name, carbonTable, - tableName); + tableName) case Alias(attr: AttributeReference, _) => set += getQueryColumn(attr.name, carbonTable, @@ -950,6 +1051,26 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } else { return false } + case Alias(expression: Expression, _) => + if (expression.isInstanceOf[ScalaUDF] && + expression.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase( + "org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction") && + CarbonUtil.hasTimeSeriesDataMap(carbonTable)) { + set += getQueryColumn(expression.asInstanceOf[ScalaUDF].children(0) + .asInstanceOf[AttributeReference].name, + carbonTable, + tableName, + timeseriesFunction = expression.asInstanceOf[ScalaUDF].children(1).asInstanceOf[Literal] + .value.toString) + } else { + expression.transform { + case attr: AttributeReference => + set += getQueryColumn(attr.name, + carbonTable, + tableName) + attr + } + } } true } @@ -1048,6 +1169,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } + + /** * Below method will be used to get the query column object which * will have details of the column and its property @@ -1074,7 +1197,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule aggFunction: String = "", dataType: String = "", isChangedDataType: Boolean = false, - isFilterColumn: Boolean = false): QueryColumn = { + isFilterColumn: Boolean = false, + timeseriesFunction: String = ""): QueryColumn = { val columnSchema = carbonTable.getColumnByName(tableName, columnName.toLowerCase) if(null == columnSchema) { null @@ -1083,11 +1207,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule new QueryColumn(columnSchema.getColumnSchema, columnSchema.getDataType.getName, aggFunction.toLowerCase, - isFilterColumn) + isFilterColumn, + timeseriesFunction.toLowerCase) } else { new QueryColumn(columnSchema.getColumnSchema, - CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType), - aggFunction.toLowerCase, isFilterColumn) + CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType), + aggFunction.toLowerCase, + isFilterColumn, + timeseriesFunction.toLowerCase) } } }
