This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new a59aec7  [CARBONDATA-3922] Support order by limit push down for 
secondary index queries
a59aec7 is described below

commit a59aec7e33666b827afc887b51823fa6266ce8fe
Author: ajantha-bhat <[email protected]>
AuthorDate: Thu Jul 23 17:40:19 2020 +0530

    [CARBONDATA-3922] Support order by limit push down for secondary index 
queries
    
    Why is this PR needed?
    a) Limit pushdown for SI is already supported. But when order by column is
    not SI column, Still we were pushing down limit. Need to fix it.
    b) when Limit is present and order by column and all the filter column is
    SI column. we can pushdown order by + limit.
    This can reduce SI output results and reduce the scan time in main table.
    c) SI transformation rule is applied even though any relation don't contain 
SI
    
    What changes were proposed in this PR?
    a) Block limit push down if order by column is not an SI column
    b) when Limit is present and order by column and all the filter column is
    SI column, pushdown order by + limit
    c) SI transformation rule need to apply only when any relation contains SI
    
    This closes #3861
---
 .../secondaryindex/TestSIWithSecondryIndex.scala   |  66 ++++++-
 .../optimizer/CarbonSITransformationRule.scala     |  15 +-
 .../optimizer/CarbonSecondaryIndexOptimizer.scala  | 196 +++++++++++++++++----
 3 files changed, 242 insertions(+), 35 deletions(-)

diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
index 3986783..47a2110 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
@@ -18,9 +18,10 @@ package org.apache.carbondata.spark.testsuite.secondaryindex
 
 import scala.collection.JavaConverters._
 
+import org.apache.commons.lang3.StringUtils
+
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import 
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
-.isFilterPushedDownToSI;
+import 
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
 import org.apache.spark.sql.{CarbonEnv, Row}
 import org.scalatest.BeforeAndAfterAll
 
@@ -29,7 +30,6 @@ import 
org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentSta
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.exception.ProcessMetaDataException
-
 import org.apache.spark.sql.test.util.QueryTest
 
 class TestSIWithSecondryIndex extends QueryTest with BeforeAndAfterAll {
@@ -319,6 +319,66 @@ class TestSIWithSecondryIndex extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists maintable")
   }
 
+  test("test SI order by limit push down") {
+    sql("drop table if exists table2")
+    sql("CREATE TABLE `table2` (`imsi` STRING, `carno` STRING, `longitude` 
STRING, `city` " +
+      "STRING, `starttime` BIGINT, `endtime` BIGINT) STORED AS carbondata 
TBLPROPERTIES" +
+      "('sort_scope'='global_sort','sort_columns'='starttime')")
+    sql("create index table2_index1 on table table2(carno, longitude, 
starttime) as 'carbondata'")
+    sql("create index table2_index2 on table table2(city) as 'carbondata'")
+    sql("insert into table2 select 'aa','ka14','ll','abc',23,24 ")
+    sql("insert into table2 select 'aa','ka14','ll','xyz',25,26 ")
+
+    // Allow order by and limit pushdown as all the filter and order by column 
is in SI
+    // a. For selected projections
+    var plan = sql(
+      "explain SELECT imsi FROM table2 WHERE  CARNO = 'ka14' AND LONGITUDE is 
not null  ORDER BY " +
+      "STARTTIME LIMIT 1")
+      .collect()(0)
+      .toString()
+    assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 2)
+
+    // b. For all projections
+    plan = sql(
+      "explain SELECT * FROM table2 WHERE  CARNO = 'ka14' AND LONGITUDE is not 
null  ORDER BY " +
+      "STARTTIME LIMIT 1")
+      .collect()(0)
+      .toString()
+    assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 2)
+
+    // Don't allow orderby and limit pushdown as order by column is not an SI 
column
+    plan = sql(
+      "explain SELECT * FROM table2 WHERE  CARNO = 'ka14' AND LONGITUDE is not 
null  ORDER BY " +
+      "endtime LIMIT 1")
+      .collect()(0)
+      .toString()
+    assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 1)
+
+    // Don't allow orderby and limit pushdown as filter column is not an SI 
column
+    plan = sql(
+      "explain SELECT * FROM table2 WHERE  imsi = 'aa' AND LONGITUDE is not 
null  ORDER BY " +
+      "STARTTIME LIMIT 1")
+      .collect()(0)
+      .toString()
+    assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 1)
+
+    // just NotEqual to should not be pushed down to SI without order by
+    plan = sql(
+      "explain SELECT * FROM table2 WHERE  CARNO != 'ka14' ")
+      .collect()(0)
+      .toString()
+    assert(!plan.contains("table2_index1"))
+
+    // NotEqual to should not be pushed down to SI without order by in case of 
multiple tables also
+    plan = sql(
+      "explain SELECT * FROM table2 WHERE  CARNO = 'ka14' and CITY != 'ddd' ")
+      .collect()(0)
+      .toString()
+    assert(!plan.contains("table2_index2") && plan.contains("table2_index1"))
+
+    sql("drop table table2")
+  }
+
   override def afterAll {
     sql("drop index si_altercolumn on table_WithSIAndAlter")
     sql("drop table if exists table_WithSIAndAlter")
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
index c802060..dba8ff2 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.secondaryindex.optimizer
 
 import org.apache.log4j.Logger
-import org.apache.spark.sql.{CarbonUtils, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonUtils, 
SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
PredicateHelper}
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -28,6 +28,7 @@ import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.index.IndexType
 
 /**
  * Rule for rewriting plan if query has a filter on index table column
@@ -41,7 +42,17 @@ class CarbonSITransformationRule(sparkSession: SparkSession)
     new CarbonSecondaryIndexOptimizer(sparkSession)
 
   def apply(plan: LogicalPlan): LogicalPlan = {
-    if (checkIfRuleNeedToBeApplied(plan)) {
+    var hasSecondaryIndexTable = false
+    plan.collect {
+      case l: LogicalRelation if (!hasSecondaryIndexTable &&
+                                  
l.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) =>
+        hasSecondaryIndexTable = l.relation
+                       .asInstanceOf[CarbonDatasourceHadoopRelation]
+                       .carbonTable
+                       
.getIndexTableNames(IndexType.SI.getIndexProviderName).size() > 0
+
+    }
+    if (hasSecondaryIndexTable && checkIfRuleNeedToBeApplied(plan)) {
       secondaryIndexOptimizer.transformFilterToJoin(plan, 
isProjectionNeeded(plan))
     } else {
       plan
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
index f39f18f..59c3c64 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
@@ -82,7 +82,8 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
    */
   private def rewritePlanForSecondaryIndex(filter: Filter,
       indexableRelation: CarbonDatasourceHadoopRelation, dbName: String,
-      cols: Seq[NamedExpression] = null, limitLiteral: Literal = null): 
LogicalPlan = {
+      cols: Seq[NamedExpression] = null, limitLiteral: Literal = null,
+      sortNodeForPushDown: Sort = null, pushDownNotNullFilter: Boolean = 
false): LogicalPlan = {
     var originalFilterAttributes: Set[String] = Set.empty
     var filterAttributes: Set[String] = Set.empty
     var matchingIndexTables: Seq[String] = Seq.empty
@@ -94,7 +95,7 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
     // Removed is Not Null filter from all filters and other attributes are 
selected
     // isNotNull filter will return all the unique values except null from 
table,
     // For High Cardinality columns, this filter is of no use, hence skipping 
it.
-    removeIsNotNullAttribute(filter.condition) collect {
+    removeIsNotNullAttribute(filter.condition, pushDownNotNullFilter) collect {
       case attr: AttributeReference =>
         filterAttributes = filterAttributes. +(attr.name.toLowerCase)
     }
@@ -184,9 +185,10 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
 
       val filterTree: SIFilterPushDownOperation = null
       val newSIFilterTree = createIndexTableFilterCondition(
-          filterTree,
-          filter.copy(filter.condition, filter.child).condition,
-          indexTableToColumnsMapping)
+        filterTree,
+        filter.copy(filter.condition, filter.child).condition,
+        indexTableToColumnsMapping,
+        pushDownNotNullFilter)
       val indexTablesDF: DataFrame = newSIFilterTree._3 match {
         case Some(tableName) =>
           // flag to check whether apply limit literal on the filter push down 
condition or not
@@ -203,7 +205,8 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
             indexTableToLogicalRelationMapping,
             originalFilterAttributes,
             limitLiteral,
-            checkAndApplyLimitLiteral)
+            checkAndApplyLimitLiteral,
+            sortNodeForPushDown)
           dataFrameWithAttributes._1
         case _ =>
           null
@@ -258,7 +261,13 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
       indexTableToLogicalRelationMapping: mutable.Map[String, LogicalPlan],
       originalFilterAttributes: Set[String],
       limitLiteral: Literal,
-      checkAndAddLimitLiteral: Boolean = false): (DataFrame, Set[String]) = {
+      checkAndAddLimitLiteral: Boolean = false,
+      sortNode: Sort): (DataFrame, Set[String]) = {
+    val sortColumns = if (sortNode != null) {
+      
sortNode.order.map(_.child.asInstanceOf[AttributeReference].name.toLowerCase()).toSet
+    } else {
+      Set.empty
+    }
     siFilterPushDownTree match {
       case SIUnaryFilterPushDownOperation(tableName, filterCondition) =>
         val attributeMap = indexTableAttributeMap.get(tableName).get
@@ -284,15 +293,46 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
         // Add Filter on logicalRelation
         var planTransform: LogicalPlan = Filter(indexTableFilter,
           indexTableToLogicalRelationMapping(tableName))
+        var needPushDown = false
+        var addLimit = checkAndAddLimitLiteral
+        if (sortNode != null && 
(filterAttributes.intersect(originalFilterAttributes)
+                                   .size == originalFilterAttributes.size)) {
+          needPushDown = true
+        } else if (filterAttributes.intersect(originalFilterAttributes)
+                     .size != originalFilterAttributes.size) {
+          addLimit = false
+        }
+        var sortAttr: Seq[AttributeReference] = Seq.empty
+        if (needPushDown) {
+          val plan = indexTableToLogicalRelationMapping(tableName)
+          plan collect {
+            case l: LogicalRelation if 
l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+              sortColumns.foreach { x =>
+                sortAttr = sortAttr :+ attributeMap(x)
+              }
+          }
+          positionReference = positionReference ++ sortAttr
+        }
         // Add PositionReference Projection on Filter
         planTransform = Project(positionReference, planTransform)
+        if (needPushDown) {
+          var newSortOrder: Seq[SortOrder] = Seq.empty
+          var i = 0
+          sortNode.order.foreach { sortOrder =>
+            newSortOrder = newSortOrder :+ SortOrder(sortAttr(i), 
sortOrder.direction)
+            i = i + 1
+          }
+          planTransform = Limit(limitLiteral, Sort(newSortOrder, 
sortNode.global, planTransform))
+          // limit is already added, no need to add again.
+          addLimit = false
+        }
         var indexTableDf = createDF(sparkSession, planTransform)
         // When all the filter columns are joined from index table,
         // limit can be pushed down before grouping last index table as the
         // number of records selected will definitely return at least 1 record
         // NOTE: flag checkAndAddLimitLiteral will be true only when the 
complete filter tree
         // contains only one node which is a unary node
-        val indexLogicalPlan = if (checkAndAddLimitLiteral) {
+        val indexLogicalPlan = if (addLimit) {
           if (limitLiteral != null &&
               filterAttributes.intersect(originalFilterAttributes)
                 .size == originalFilterAttributes.size) {
@@ -315,14 +355,16 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
           indexJoinedFilterAttributes,
           indexTableToLogicalRelationMapping,
           originalFilterAttributes,
-          limitLiteral)
+          limitLiteral,
+          sortNode = sortNode)
         val (rightOperationDataFrame, indexFilterAttributesRight) = 
createIndexFilterDataFrame(
           rightOperation,
           indexTableAttributeMap,
           indexFilterAttributesLeft,
           indexTableToLogicalRelationMapping,
           originalFilterAttributes,
-          limitLiteral)
+          limitLiteral,
+          sortNode = sortNode)
 
         // create new data frame by applying join or union based on nodeType
         val newDFAfterUnionOrJoin = applyUnionOrJoinOnDataFrames(nodeType,
@@ -399,15 +441,16 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
     allIndexTablesDF
   }
 
-  private def removeIsNotNullAttribute(condition: Expression): Expression = {
+  private def removeIsNotNullAttribute(condition: Expression,
+      pushDownNotNullFilter: Boolean): Expression = {
     val isPartialStringEnabled = CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.ENABLE_SI_LOOKUP_PARTIALSTRING,
         CarbonCommonConstants.ENABLE_SI_LOOKUP_PARTIALSTRING_DEFAULT)
       .equalsIgnoreCase("true")
     condition transform {
-      case IsNotNull(child: AttributeReference) => Literal(true)
       // Like is possible only if user provides _ in between the string
       // _ in like means any single character wild card check.
+      case IsNotNull(child: AttributeReference) => 
Literal(!pushDownNotNullFilter)
       case plan if (CarbonHiveIndexMetadataUtil.checkNIUDF(plan)) => 
Literal(true)
       case Like(left: AttributeReference, right: Literal) if 
(!isPartialStringEnabled) => Literal(
         true)
@@ -456,7 +499,7 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
    */
   private def isConditionColumnInIndexTable(condition: Expression,
       indexTableColumnsToTableMapping: mutable.Map[String, Set[String]],
-      pushDownRequired: Boolean): Option[String] = {
+      pushDownRequired: Boolean, pushDownNotNullFilter: Boolean): 
Option[String] = {
     // In case of Like Filter in OR, both the conditions should not be 
transformed
     // In case of like filter in And, only like filter should be removed and
     // other filter should be transformed with index table
@@ -468,10 +511,10 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
 
     var tableName: Option[String] = None
     val doNotPushToSI = condition match {
+      case IsNotNull(child: AttributeReference) => !pushDownNotNullFilter
       case Not(EqualTo(left: AttributeReference, right: Literal)) => true
       case Not(Like(left: AttributeReference, right: Literal)) => true
       case Not(In(left: AttributeReference, right: Seq[Expression])) => true
-      case IsNotNull(child: AttributeReference) => true
       case Like(left: AttributeReference, right: Literal) if 
(!pushDownRequired) => true
       case EndsWith(left: AttributeReference, right: Literal) if 
(!pushDownRequired) => true
       case Contains(left: AttributeReference, right: Literal) if 
(!pushDownRequired) => true
@@ -514,7 +557,8 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
    */
   private def createIndexTableFilterCondition(filterTree: 
SIFilterPushDownOperation,
       condition: Expression,
-      indexTableToColumnsMapping: mutable.Map[String, Set[String]]):
+      indexTableToColumnsMapping: mutable.Map[String, Set[String]],
+      pushDownNotNullFilter: Boolean):
   (SIFilterPushDownOperation, Expression, Option[String]) = {
     condition match {
       case or@Or(left, right) =>
@@ -522,12 +566,12 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
           createIndexTableFilterCondition(
             filterTree,
             left,
-            indexTableToColumnsMapping)
+            indexTableToColumnsMapping, pushDownNotNullFilter)
         val (newSIFilterTreeRight, newRight, tableNameRight) =
           createIndexTableFilterCondition(
             filterTree,
             right,
-            indexTableToColumnsMapping)
+            indexTableToColumnsMapping, pushDownNotNullFilter)
 
         (tableNameLeft, tableNameRight) match {
           case (Some(tableLeft), Some(tableRight)) =>
@@ -565,12 +609,14 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
           createIndexTableFilterCondition(
             filterTree,
             left,
-            indexTableToColumnsMapping)
+            indexTableToColumnsMapping,
+            pushDownNotNullFilter)
         val (newSIFilterTreeRight, newRight, tableNameRight) =
           createIndexTableFilterCondition(
             filterTree,
             right,
-            indexTableToColumnsMapping)
+            indexTableToColumnsMapping,
+            pushDownNotNullFilter)
         (tableNameLeft, tableNameRight) match {
           case (Some(tableLeft), Some(tableRight)) =>
             // push down both left and right condition if both left and right 
columns have index
@@ -605,7 +651,7 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
         }
         val tableName = isConditionColumnInIndexTable(condition,
           indexTableToColumnsMapping,
-          isPartialStringEnabled)
+          isPartialStringEnabled, pushDownNotNullFilter = 
pushDownNotNullFilter)
         // create a node if condition can be pushed down else return the same 
filterTree
         val newFilterTree = tableName match {
           case Some(table) =>
@@ -696,21 +742,71 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
       .getBoolean("spark.carbon.pushdown.join.as.filter", defaultValue = true)
     val transformChild = false
     var addProjection = needProjection
+    // to store the sort node per query
+    var sortNodeForPushDown: Sort = null
+    // to store the limit literal per query
+    var limitLiteral: Literal = null
+    // by default do not push down notNull filter,
+    // but for orderby limit push down, push down notNull filter also. Else we 
get wrong results.
+    var pushDownNotNullFilter: Boolean = false
     val transformedPlan = transformPlan(plan, {
-      case union@Union(children) =>
+      case union@Union(_) =>
         // In case of Union, Extra Project has to be added to the Plan. 
Because if left table is
         // pushed to SI and right table is not pushed, then Output Attribute 
mismatch will happen
         addProjection = true
         (union, true)
-      case sort@Sort(order, global, plan) =>
+      case sort@Sort(_, _, _) =>
         addProjection = true
         (sort, true)
-      case filter@Filter(condition, 
logicalRelation@MatchIndexableRelation(indexableRelation))
+      case limit@Limit(literal: Literal, sort@Sort(_, _, child)) =>
+        child match {
+          case filter: Filter =>
+            if (checkIfPushDownOrderByLimitAndNotNullFilter(literal, sort, 
filter)) {
+              sortNodeForPushDown = sort
+              limitLiteral = literal
+              pushDownNotNullFilter = true
+            }
+          case p: Project if p.child.isInstanceOf[Filter] =>
+            if (checkIfPushDownOrderByLimitAndNotNullFilter(literal,
+              sort,
+              p.child.asInstanceOf[Filter])) {
+              sortNodeForPushDown = sort
+              limitLiteral = literal
+              pushDownNotNullFilter = true
+            }
+          case _ =>
+        }
+        (limit, transformChild)
+      case limit@Limit(literal: Literal, _@Project(_, sort@Sort(_, _, child))) 
=>
+        child match {
+          case f: Filter =>
+            if (checkIfPushDownOrderByLimitAndNotNullFilter(literal, sort, f)) 
{
+              sortNodeForPushDown = sort
+              limitLiteral = literal
+              pushDownNotNullFilter = true
+            }
+          case p: Project if (p.child.isInstanceOf[Filter]) =>
+            if (checkIfPushDownOrderByLimitAndNotNullFilter(literal,
+              sort,
+              p.child.asInstanceOf[Filter])) {
+              sortNodeForPushDown = sort
+              limitLiteral = literal
+              pushDownNotNullFilter = true
+            }
+          case _ =>
+        }
+        (limit, transformChild)
+      case filter@Filter(condition, 
_@MatchIndexableRelation(indexableRelation))
         if !condition.isInstanceOf[IsNotNull] &&
            CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
-        val reWrittenPlan = rewritePlanForSecondaryIndex(filter, 
indexableRelation,
+        val reWrittenPlan = rewritePlanForSecondaryIndex(
+          filter,
+          indexableRelation,
           filter.child.asInstanceOf[LogicalRelation].relation
-            
.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName)
+            
.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName,
+          limitLiteral = limitLiteral,
+          sortNodeForPushDown = sortNodeForPushDown,
+          pushDownNotNullFilter = pushDownNotNullFilter)
         if (reWrittenPlan.isInstanceOf[Join]) {
           if (pushDownJoinEnabled && !addProjection) {
             (reWrittenPlan, transformChild)
@@ -721,12 +817,18 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
           (filter, transformChild)
         }
       case projection@Project(cols, filter@Filter(condition,
-      logicalRelation@MatchIndexableRelation(indexableRelation)))
+      _@MatchIndexableRelation(indexableRelation)))
         if !condition.isInstanceOf[IsNotNull] &&
            CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
-        val reWrittenPlan = rewritePlanForSecondaryIndex(filter, 
indexableRelation,
+        val reWrittenPlan = rewritePlanForSecondaryIndex(
+          filter,
+          indexableRelation,
           filter.child.asInstanceOf[LogicalRelation].relation
-            
.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName, cols)
+            
.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName,
+          cols,
+          limitLiteral = limitLiteral,
+          sortNodeForPushDown = sortNodeForPushDown,
+          pushDownNotNullFilter = pushDownNotNullFilter)
         // If Index table is matched, join plan will be returned.
         // Adding projection over join to return only selected columns from 
query.
         // Else all columns from left & right table will be returned in output 
columns
@@ -744,7 +846,7 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
       // last index table, as number of records returned after join where 
unique and it will
       // definitely return at least 1 record.
       case limit@Limit(literal: Literal,
-      filter@Filter(condition, 
logicalRelation@MatchIndexableRelation(indexableRelation)))
+      filter@Filter(condition, _@MatchIndexableRelation(indexableRelation)))
         if !condition.isInstanceOf[IsNotNull] &&
            CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
         val carbonRelation = 
filter.child.asInstanceOf[LogicalRelation].relation
@@ -771,7 +873,7 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
           (limit, transformChild)
         }
       case limit@Limit(literal: Literal, projection@Project(cols, 
filter@Filter(condition,
-      logicalRelation@MatchIndexableRelation(indexableRelation))))
+      _@MatchIndexableRelation(indexableRelation))))
         if !condition.isInstanceOf[IsNotNull] &&
            CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
         val carbonRelation = 
filter.child.asInstanceOf[LogicalRelation].relation
@@ -836,6 +938,40 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
     }
   }
 
+  private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal, 
sort: Sort,
+      filter: Filter): Boolean = {
+    val filterAttributes = filter.condition collect {
+      case attr: AttributeReference => attr.name.toLowerCase
+    }
+    val parentTableRelation = MatchIndexableRelation.unapply(filter.child).get
+    val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables(
+      filterAttributes.toSet.asJava,
+      
CarbonIndexUtil.getSecondaryIndexes(parentTableRelation).mapValues(_.toList.asJava).asJava)
+      .asScala
+    val databaseName = parentTableRelation.carbonRelation.databaseName
+    // filter out all the index tables which are disabled
+    val enabledMatchingIndexTables = matchingIndexTables
+      .filter(table => {
+        sparkSession.sessionState.catalog
+          .getTableMetadata(TableIdentifier(table,
+            Some(databaseName))).storage
+          .properties
+          .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true")
+      })
+    // 1. check if only one SI matches for the filter columns
+    if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size 
== 1) {
+      // 2. check if all the sort columns is in SI
+      val sortColumns = sort
+        .order
+        .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase())
+        .toSet
+      val indexCarbonTable = CarbonEnv
+        .getCarbonTable(Some(databaseName), 
enabledMatchingIndexTables.head)(sparkSession)
+      return sortColumns.forall { x => indexCarbonTable.getColumnByName(x) != 
null }
+    }
+    false
+  }
+
 }
 
 object MatchIndexableRelation {

Reply via email to