ajantha-bhat commented on a change in pull request #3902:
URL: https://github.com/apache/carbondata/pull/3902#discussion_r481694795
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
##########
@@ -373,6 +375,146 @@ object CarbonFilters {
val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession)
getPartitions(partitionFilters, sparkSession, carbonTable)
}
+
+ def getStorageOrdinal(filter: Filter, carbonTable: CarbonTable): Int = {
+ val column = filter.references.map(carbonTable.getColumnByName)
+ if (column.isEmpty) {
+ -1
+ } else {
+ if (column.head.isDimension) {
+ column.head.getOrdinal
+ } else {
+ column.head.getOrdinal + carbonTable.getAllDimensions.size()
+ }
+ }
+ }
+
+ def collectSimilarExpressions(filter: Filter, table: CarbonTable):
Seq[(Filter, Int)] = {
+ filter match {
+ case sources.And(left, right) =>
+ collectSimilarExpressions(left, table) ++
collectSimilarExpressions(right, table)
+ case sources.Or(left, right) => collectSimilarExpressions(left, table) ++
+ collectSimilarExpressions(right, table)
+ case others => Seq((others, getStorageOrdinal(others, table)))
+ }
+ }
+
+ /**
+ * This method will reorder the filter based on the Storage Ordinal of the
column references.
+ *
+ * Example1:
+ * And And
+ * Or And => Or And
+ * col3 col1 col2 col1 col1 col3 col1 col2
+ *
+ * **Mixed expression filter reordered locally, but wont be reordered
globally.**
+ *
+ * Example2:
+ * And And
+ * And And => And And
+ * col3 col1 col2 col1 col1 col1 col2 col3
Review comment:
right side after optimization when it becomes (col1 And col1), better to
modify filter to just col1 ?
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
##########
@@ -373,6 +375,146 @@ object CarbonFilters {
val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession)
getPartitions(partitionFilters, sparkSession, carbonTable)
}
+
+ def getStorageOrdinal(filter: Filter, carbonTable: CarbonTable): Int = {
+ val column = filter.references.map(carbonTable.getColumnByName)
+ if (column.isEmpty) {
+ -1
+ } else {
+ if (column.head.isDimension) {
+ column.head.getOrdinal
+ } else {
+ column.head.getOrdinal + carbonTable.getAllDimensions.size()
+ }
+ }
+ }
+
+ def collectSimilarExpressions(filter: Filter, table: CarbonTable):
Seq[(Filter, Int)] = {
+ filter match {
+ case sources.And(left, right) =>
+ collectSimilarExpressions(left, table) ++
collectSimilarExpressions(right, table)
+ case sources.Or(left, right) => collectSimilarExpressions(left, table) ++
+ collectSimilarExpressions(right, table)
+ case others => Seq((others, getStorageOrdinal(others, table)))
+ }
+ }
+
+ /**
+ * This method will reorder the filter based on the Storage Ordinal of the
column references.
+ *
+ * Example1:
+ * And And
+ * Or And => Or And
+ * col3 col1 col2 col1 col1 col3 col1 col2
+ *
+ * **Mixed expression filter reordered locally, but wont be reordered
globally.**
+ *
+ * Example2:
+ * And And
+ * And And => And And
+ * col3 col1 col2 col1 col1 col1 col2 col3
+ *
+ * Or Or
+ * Or Or => Or Or
+ * col3 col1 col2 col1 col1 col1 col2 col3
+ *
+ * **Similar expression filters are reordered globally**
+ *
+ * @param filter the filter expression to be reordered
+ * @return The reordered filter with the current ordinal
+ */
+ def reorderFilter(filter: Filter, table: CarbonTable): (Filter, Int) = {
+ val filterMap = mutable.HashMap[String, List[(Filter, Int)]]()
+ def sortFilter(filter: Filter): (Filter, Int) = {
Review comment:
Agree with david. I too felt it is not easy to read
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
##########
@@ -72,7 +72,9 @@ case class CarbonDatasourceHadoopRelation(
projects: Seq[NamedExpression],
filters: Array[Filter],
partitions: Seq[PartitionSpec]): RDD[InternalRow] = {
- val filterExpression: Option[Expression] = filters.flatMap { filter =>
+ val reorderedFilter = filters.map(CarbonFilters.reorderFilter(_,
carbonTable)).sortBy(_._2)
Review comment:
better to do this at down layer (common code) so that presto and hive
queries also can make use of this ?
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
##########
@@ -373,6 +375,146 @@ object CarbonFilters {
val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession)
getPartitions(partitionFilters, sparkSession, carbonTable)
}
+
+ def getStorageOrdinal(filter: Filter, carbonTable: CarbonTable): Int = {
+ val column = filter.references.map(carbonTable.getColumnByName)
+ if (column.isEmpty) {
+ -1
+ } else {
+ if (column.head.isDimension) {
+ column.head.getOrdinal
+ } else {
+ column.head.getOrdinal + carbonTable.getAllDimensions.size()
+ }
+ }
+ }
+
+ def collectSimilarExpressions(filter: Filter, table: CarbonTable):
Seq[(Filter, Int)] = {
+ filter match {
+ case sources.And(left, right) =>
+ collectSimilarExpressions(left, table) ++
collectSimilarExpressions(right, table)
+ case sources.Or(left, right) => collectSimilarExpressions(left, table) ++
+ collectSimilarExpressions(right, table)
+ case others => Seq((others, getStorageOrdinal(others, table)))
+ }
+ }
+
+ /**
+ * This method will reorder the filter based on the Storage Ordinal of the
column references.
Review comment:
In case of multiple AND filters, we need to first read the column that
gives less output, so that this output can be sent to other filter by bitset
pipeline. So, always changing to storage order may not be good idea. [because
somecase the user can know the cardinality and provide efficient filter order]
@kumarvishal09, @QiangCai, @ravipesala : what is your opinion on this ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]