ajantha-bhat commented on a change in pull request #3861:
URL: https://github.com/apache/carbondata/pull/3861#discussion_r481197993



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -684,21 +730,71 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
       .getBoolean("spark.carbon.pushdown.join.as.filter", defaultValue = true)
     val transformChild = false
     var addProjection = needProjection
+    // to store the sort node per query
+    var sortNodeForPushDown: Sort = null
+    // to store the limit literal per query
+    var limitLiteral: Literal = null
+    // by default do not push down notNull filter,
+    // but for orderby limit push down, push down notNull filter also. Else we 
get wrong results.
+    var pushDownNotNullFilter: Boolean = false
     val transformedPlan = transformPlan(plan, {
-      case union@Union(children) =>
+      case union@Union(_) =>
         // In case of Union, Extra Project has to be added to the Plan. 
Because if left table is
         // pushed to SI and right table is not pushed, then Output Attribute 
mismatch will happen
         addProjection = true
         (union, true)
-      case sort@Sort(order, global, plan) =>
+      case sort@Sort(_, _, _) =>
         addProjection = true
         (sort, true)
-      case filter@Filter(condition, 
logicalRelation@MatchIndexableRelation(indexableRelation))
+      case limit@Limit(literal: Literal, sort@Sort(_, _, child)) =>
+        child match {
+          case filter: Filter =>
+            if (checkIfPushDownOrderByLimitAndNotNullFilter(literal, sort, 
filter)) {
+              sortNodeForPushDown = sort
+              limitLiteral = literal
+              pushDownNotNullFilter = true
+            }
+          case p: Project if (p.child.isInstanceOf[Filter]) =>
+            if (checkIfPushDownOrderByLimitAndNotNullFilter(literal,
+              sort,
+              p.child.asInstanceOf[Filter])) {
+              sortNodeForPushDown = sort
+              limitLiteral = literal
+              pushDownNotNullFilter = true
+            }
+          case _ =>
+        }
+        (limit, transformChild)
+      case limit@Limit(literal: Literal, _@Project(_, child)) if 
child.isInstanceOf[Sort] =>

Review comment:
       done

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
##########
@@ -824,6 +926,42 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
     }
   }
 
+  private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, 
sort: Sort,
+      filter: Filter): Boolean = {
+    val filterAttributes = filter.condition collect {
+      case attr: AttributeReference => attr.name.toLowerCase
+    }
+    val parentTableRelation = MatchIndexableRelation.unapply(filter.child).get
+    val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables(
+      filterAttributes.toSet.asJava,
+      
CarbonIndexUtil.getSecondaryIndexes(parentTableRelation).mapValues(_.toList.asJava).asJava)
+      .asScala
+    val databaseName = parentTableRelation.carbonRelation.databaseName
+    // filter out all the index tables which are disabled
+    val enabledMatchingIndexTables = matchingIndexTables
+      .filter(table => {
+        sparkSession.sessionState.catalog
+          .getTableMetadata(TableIdentifier(table,
+            Some(databaseName))).storage
+          .properties
+          .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true")
+      })
+    // 1. check if only one SI matches for the filter columns
+    if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size 
== 1) {
+      // 2. check if all the sort columns is in SI
+      val sortColumns = sort
+        .order
+        .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase())
+        .toSet
+      val indexCarbonTable = CarbonEnv
+        .getCarbonTable(Some(databaseName), 
enabledMatchingIndexTables.head)(sparkSession)
+      if (sortColumns.forall { x => indexCarbonTable.getColumnByName(x) != 
null }) {

Review comment:
       yeah. done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to