Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155279540
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
---
@@ -493,4 +492,102 @@ object PreAggregateUtil {
updatedPlan
}
+ /**
+ * Below method will be used to get the select query when rollup policy
is
+ * applied in case of timeseries table
+ * @param tableSchema
+ * main data map schema
+ * @param selectedDataMapSchema
+ * selected data map schema for rollup
+ * @return select query based on rolloup
+ */
+ def createTimeseriesSelectQueryForRollup(
+ tableSchema: TableSchema,
+ selectedDataMapSchema: AggregationDataMapSchema): String = {
+ val aggregateColumns =
scala.collection.mutable.ArrayBuffer.empty[String]
+ val groupingExpressions =
scala.collection.mutable.ArrayBuffer.empty[String]
+ tableSchema.getListOfColumns.asScala.foreach {
+ a => if (a.getAggFunction.nonEmpty) {
+ aggregateColumns += s"${a.getAggFunction match {
+ case "count" => "sum"
+ case others@_ =>
others}}(${selectedDataMapSchema.getAggChildColByParent(
+ a.getParentColumnTableRelations.get(0).getColumnName,
a.getAggFunction).getColumnName})"
+ } else if (a.getTimeSeriesFunction.nonEmpty) {
+ groupingExpressions += s"timeseries(${
+ selectedDataMapSchema
+
.getNonAggNonTimeChildColBasedByParent(a.getParentColumnTableRelations.
+ get(0).getColumnName).getColumnName
+ } , '${ a.getTimeSeriesFunction }')"
+ } else {
+ groupingExpressions += selectedDataMapSchema
+
.getNonAggNonTimeChildColBasedByParent(a.getParentColumnTableRelations.
+ get(0).getColumnName).getColumnName
+ }
+ }
+ s"select ${ groupingExpressions.mkString(",") },${
aggregateColumns.mkString(",")
+ } from ${selectedDataMapSchema.getChildSchema.getTableName } " +
+ s"group by ${ groupingExpressions.mkString(",") }"
+ }
+
+ /**
+ * Below method will be used to creating select query for timeseries
+ * for lowest level for aggergation like second level, in that case it
will
+ * hit the maintable
+ * @param tableSchema
+ * data map schema
+ * @param parentTableName
+ * parent schema
+ * @return select query for loading
+ */
+ def createTimeSeriesSelectQueryFromMain(tableSchema: TableSchema,
+ parentTableName: String): String = {
+ val aggregateColumns =
scala.collection.mutable.ArrayBuffer.empty[String]
+ val groupingExpressions =
scala.collection.mutable.ArrayBuffer.empty[String]
+ tableSchema.getListOfColumns.asScala.foreach {
+ a =>
+ if (a.getAggFunction.nonEmpty) {
+ aggregateColumns +=
+ s"${ a.getAggFunction }(${
a.getParentColumnTableRelations.get(0).getColumnName })"
+ } else if (a.getTimeSeriesFunction.nonEmpty) {
+ groupingExpressions +=
+ s"timeseries(${
a.getParentColumnTableRelations.get(0).getColumnName },'${
+ a
+ .getTimeSeriesFunction
+ }')"
+ } else {
+ groupingExpressions +=
a.getParentColumnTableRelations.get(0).getColumnName
+ }
+ }
+ s"select ${ groupingExpressions.mkString(",") },${
+ aggregateColumns.mkString(",")
+ } from ${ parentTableName } group by ${
groupingExpressions.mkString(",") }"
+
+ }
+ /**
+ * Below method will be used to select rollup table in case of
+ * timeseries data map loading
+ * @param list
+ * list of timeseries datamap
+ * @param dataMapSchema
+ * datamap schema
+ * @return select table name
+ */
+ def getRollupDataMapNameForTimeSeries(
+ list: scala.collection.mutable.ListBuffer[AggregationDataMapSchema],
+ dataMapSchema: AggregationDataMapSchema):
Option[AggregationDataMapSchema] = {
+ if (list.isEmpty) {
+ None
+ } else {
+ val rollupDataMapSchema =
scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema]
+ list.foreach{f =>
+ if (dataMapSchema.canSelectForRollup(f)) {
+ rollupDataMapSchema += f
+ } }
+ if(rollupDataMapSchema.isEmpty) {
--- End diff --
use `lastOption` instead of if else
---