Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1626#discussion_r155282532
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
---
@@ -321,6 +313,62 @@ case class CarbonPreAggregateQueryRules(sparkSession:
SparkSession) extends Rule
}
}
+ /**
+ * Below method will be used to extract the query columns from
+ * filter expression
+ * @param filterExp
+ * filter expression
+ * @param set
+ * query column list
+ * @param carbonTable
+ * parent table
+ * @param tableName
+ * table name
+ * @return isvalid filter expression for aggregate
+ */
+ def extractQueryColumnFromFilterExp(filterExp: Expression,
+ set: scala.collection.mutable.HashSet[QueryColumn],
+ carbonTable: CarbonTable, tableName: String): Boolean = {
+ val newFilterList =
scala.collection.mutable.HashMap.empty[AttributeReference, String]
+ var isValidPlan = true
+ filterExp.transform {
+ case attr: AttributeReference =>
+ if (!newFilterList.get(attr).isDefined) {
+ newFilterList.put(attr, null)
+ }
+ attr
+ case udf@ScalaUDF(_, _, _, _) =>
+ if (udf.function.getClass.getName
+
.equalsIgnoreCase("org.apache.spark.sql.execution.command.timeseries.TimeseriesUDf")
&&
+ carbonTable.hasTimeSeriesDataMap) {
+
newFilterList.put(udf.children(0).asInstanceOf[AttributeReference],
+ udf.children(1).asInstanceOf[Literal].value.toString)
+ } else {
+ udf.transform {
+ case attr: AttributeReference =>
+ if (!newFilterList.get(attr).isDefined) {
+ newFilterList.put(attr, null)
+ }
+ attr
+ }
+ }
+ udf
+ }
+ newFilterList.foreach {
+ f =>
--- End diff --
format properly
---