This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new a59aec7 [CARBONDATA-3922] Support order by limit push down for
secondary index queries
a59aec7 is described below
commit a59aec7e33666b827afc887b51823fa6266ce8fe
Author: ajantha-bhat <[email protected]>
AuthorDate: Thu Jul 23 17:40:19 2020 +0530
[CARBONDATA-3922] Support order by limit push down for secondary index
queries
Why is this PR needed?
a) Limit pushdown for SI is already supported. But when order by column is
not SI column, Still we were pushing down limit. Need to fix it.
b) when Limit is present and order by column and all the filter column is
SI column. we can pushdown order by + limit.
This can reduce SI output results and reduce the scan time in main table.
c) SI transformation rule is applied even though any relation don't contain
SI
What changes were proposed in this PR?
a) Block limit push down if order by column is not an SI column
b) when Limit is present and order by column and all the filter column is
SI column, pushdown order by + limit
c) SI transformation rule need to apply only when any relation contains SI
This closes #3861
---
.../secondaryindex/TestSIWithSecondryIndex.scala | 66 ++++++-
.../optimizer/CarbonSITransformationRule.scala | 15 +-
.../optimizer/CarbonSecondaryIndexOptimizer.scala | 196 +++++++++++++++++----
3 files changed, 242 insertions(+), 35 deletions(-)
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
index 3986783..47a2110 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
@@ -18,9 +18,10 @@ package org.apache.carbondata.spark.testsuite.secondaryindex
import scala.collection.JavaConverters._
+import org.apache.commons.lang3.StringUtils
+
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
-.isFilterPushedDownToSI;
+import
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
import org.apache.spark.sql.{CarbonEnv, Row}
import org.scalatest.BeforeAndAfterAll
@@ -29,7 +30,6 @@ import
org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentSta
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.spark.exception.ProcessMetaDataException
-
import org.apache.spark.sql.test.util.QueryTest
class TestSIWithSecondryIndex extends QueryTest with BeforeAndAfterAll {
@@ -319,6 +319,66 @@ class TestSIWithSecondryIndex extends QueryTest with
BeforeAndAfterAll {
sql("drop table if exists maintable")
}
+ test("test SI order by limit push down") {
+ sql("drop table if exists table2")
+ sql("CREATE TABLE `table2` (`imsi` STRING, `carno` STRING, `longitude`
STRING, `city` " +
+ "STRING, `starttime` BIGINT, `endtime` BIGINT) STORED AS carbondata
TBLPROPERTIES" +
+ "('sort_scope'='global_sort','sort_columns'='starttime')")
+ sql("create index table2_index1 on table table2(carno, longitude,
starttime) as 'carbondata'")
+ sql("create index table2_index2 on table table2(city) as 'carbondata'")
+ sql("insert into table2 select 'aa','ka14','ll','abc',23,24 ")
+ sql("insert into table2 select 'aa','ka14','ll','xyz',25,26 ")
+
+ // Allow order by and limit pushdown as all the filter and order by column
is in SI
+ // a. For selected projections
+ var plan = sql(
+ "explain SELECT imsi FROM table2 WHERE CARNO = 'ka14' AND LONGITUDE is
not null ORDER BY " +
+ "STARTTIME LIMIT 1")
+ .collect()(0)
+ .toString()
+ assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 2)
+
+ // b. For all projections
+ plan = sql(
+ "explain SELECT * FROM table2 WHERE CARNO = 'ka14' AND LONGITUDE is not
null ORDER BY " +
+ "STARTTIME LIMIT 1")
+ .collect()(0)
+ .toString()
+ assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 2)
+
+ // Don't allow orderby and limit pushdown as order by column is not an SI
column
+ plan = sql(
+ "explain SELECT * FROM table2 WHERE CARNO = 'ka14' AND LONGITUDE is not
null ORDER BY " +
+ "endtime LIMIT 1")
+ .collect()(0)
+ .toString()
+ assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 1)
+
+ // Don't allow orderby and limit pushdown as filter column is not an SI
column
+ plan = sql(
+ "explain SELECT * FROM table2 WHERE imsi = 'aa' AND LONGITUDE is not
null ORDER BY " +
+ "STARTTIME LIMIT 1")
+ .collect()(0)
+ .toString()
+ assert(StringUtils.countMatches(plan, "TakeOrderedAndProject") == 1)
+
+ // just NotEqual to should not be pushed down to SI without order by
+ plan = sql(
+ "explain SELECT * FROM table2 WHERE CARNO != 'ka14' ")
+ .collect()(0)
+ .toString()
+ assert(!plan.contains("table2_index1"))
+
+ // NotEqual to should not be pushed down to SI without order by in case of
multiple tables also
+ plan = sql(
+ "explain SELECT * FROM table2 WHERE CARNO = 'ka14' and CITY != 'ddd' ")
+ .collect()(0)
+ .toString()
+ assert(!plan.contains("table2_index2") && plan.contains("table2_index1"))
+
+ sql("drop table table2")
+ }
+
override def afterAll {
sql("drop index si_altercolumn on table_WithSIAndAlter")
sql("drop table if exists table_WithSIAndAlter")
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
index c802060..dba8ff2 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSITransformationRule.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.secondaryindex.optimizer
import org.apache.log4j.Logger
-import org.apache.spark.sql.{CarbonUtils, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonUtils,
SparkSession}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
PredicateHelper}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
@@ -28,6 +28,7 @@ import org.apache.spark.util.SparkUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.index.IndexType
/**
* Rule for rewriting plan if query has a filter on index table column
@@ -41,7 +42,17 @@ class CarbonSITransformationRule(sparkSession: SparkSession)
new CarbonSecondaryIndexOptimizer(sparkSession)
def apply(plan: LogicalPlan): LogicalPlan = {
- if (checkIfRuleNeedToBeApplied(plan)) {
+ var hasSecondaryIndexTable = false
+ plan.collect {
+ case l: LogicalRelation if (!hasSecondaryIndexTable &&
+
l.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) =>
+ hasSecondaryIndexTable = l.relation
+ .asInstanceOf[CarbonDatasourceHadoopRelation]
+ .carbonTable
+
.getIndexTableNames(IndexType.SI.getIndexProviderName).size() > 0
+
+ }
+ if (hasSecondaryIndexTable && checkIfRuleNeedToBeApplied(plan)) {
secondaryIndexOptimizer.transformFilterToJoin(plan,
isProjectionNeeded(plan))
} else {
plan
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
index f39f18f..59c3c64 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
@@ -82,7 +82,8 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
*/
private def rewritePlanForSecondaryIndex(filter: Filter,
indexableRelation: CarbonDatasourceHadoopRelation, dbName: String,
- cols: Seq[NamedExpression] = null, limitLiteral: Literal = null):
LogicalPlan = {
+ cols: Seq[NamedExpression] = null, limitLiteral: Literal = null,
+ sortNodeForPushDown: Sort = null, pushDownNotNullFilter: Boolean =
false): LogicalPlan = {
var originalFilterAttributes: Set[String] = Set.empty
var filterAttributes: Set[String] = Set.empty
var matchingIndexTables: Seq[String] = Seq.empty
@@ -94,7 +95,7 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
// Removed is Not Null filter from all filters and other attributes are
selected
// isNotNull filter will return all the unique values except null from
table,
// For High Cardinality columns, this filter is of no use, hence skipping
it.
- removeIsNotNullAttribute(filter.condition) collect {
+ removeIsNotNullAttribute(filter.condition, pushDownNotNullFilter) collect {
case attr: AttributeReference =>
filterAttributes = filterAttributes. +(attr.name.toLowerCase)
}
@@ -184,9 +185,10 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
val filterTree: SIFilterPushDownOperation = null
val newSIFilterTree = createIndexTableFilterCondition(
- filterTree,
- filter.copy(filter.condition, filter.child).condition,
- indexTableToColumnsMapping)
+ filterTree,
+ filter.copy(filter.condition, filter.child).condition,
+ indexTableToColumnsMapping,
+ pushDownNotNullFilter)
val indexTablesDF: DataFrame = newSIFilterTree._3 match {
case Some(tableName) =>
// flag to check whether apply limit literal on the filter push down
condition or not
@@ -203,7 +205,8 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
indexTableToLogicalRelationMapping,
originalFilterAttributes,
limitLiteral,
- checkAndApplyLimitLiteral)
+ checkAndApplyLimitLiteral,
+ sortNodeForPushDown)
dataFrameWithAttributes._1
case _ =>
null
@@ -258,7 +261,13 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
indexTableToLogicalRelationMapping: mutable.Map[String, LogicalPlan],
originalFilterAttributes: Set[String],
limitLiteral: Literal,
- checkAndAddLimitLiteral: Boolean = false): (DataFrame, Set[String]) = {
+ checkAndAddLimitLiteral: Boolean = false,
+ sortNode: Sort): (DataFrame, Set[String]) = {
+ val sortColumns = if (sortNode != null) {
+
sortNode.order.map(_.child.asInstanceOf[AttributeReference].name.toLowerCase()).toSet
+ } else {
+ Set.empty
+ }
siFilterPushDownTree match {
case SIUnaryFilterPushDownOperation(tableName, filterCondition) =>
val attributeMap = indexTableAttributeMap.get(tableName).get
@@ -284,15 +293,46 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
// Add Filter on logicalRelation
var planTransform: LogicalPlan = Filter(indexTableFilter,
indexTableToLogicalRelationMapping(tableName))
+ var needPushDown = false
+ var addLimit = checkAndAddLimitLiteral
+ if (sortNode != null &&
(filterAttributes.intersect(originalFilterAttributes)
+ .size == originalFilterAttributes.size)) {
+ needPushDown = true
+ } else if (filterAttributes.intersect(originalFilterAttributes)
+ .size != originalFilterAttributes.size) {
+ addLimit = false
+ }
+ var sortAttr: Seq[AttributeReference] = Seq.empty
+ if (needPushDown) {
+ val plan = indexTableToLogicalRelationMapping(tableName)
+ plan collect {
+ case l: LogicalRelation if
l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ sortColumns.foreach { x =>
+ sortAttr = sortAttr :+ attributeMap(x)
+ }
+ }
+ positionReference = positionReference ++ sortAttr
+ }
// Add PositionReference Projection on Filter
planTransform = Project(positionReference, planTransform)
+ if (needPushDown) {
+ var newSortOrder: Seq[SortOrder] = Seq.empty
+ var i = 0
+ sortNode.order.foreach { sortOrder =>
+ newSortOrder = newSortOrder :+ SortOrder(sortAttr(i),
sortOrder.direction)
+ i = i + 1
+ }
+ planTransform = Limit(limitLiteral, Sort(newSortOrder,
sortNode.global, planTransform))
+ // limit is already added, no need to add again.
+ addLimit = false
+ }
var indexTableDf = createDF(sparkSession, planTransform)
// When all the filter columns are joined from index table,
// limit can be pushed down before grouping last index table as the
// number of records selected will definitely return at least 1 record
// NOTE: flag checkAndAddLimitLiteral will be true only when the
complete filter tree
// contains only one node which is a unary node
- val indexLogicalPlan = if (checkAndAddLimitLiteral) {
+ val indexLogicalPlan = if (addLimit) {
if (limitLiteral != null &&
filterAttributes.intersect(originalFilterAttributes)
.size == originalFilterAttributes.size) {
@@ -315,14 +355,16 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
indexJoinedFilterAttributes,
indexTableToLogicalRelationMapping,
originalFilterAttributes,
- limitLiteral)
+ limitLiteral,
+ sortNode = sortNode)
val (rightOperationDataFrame, indexFilterAttributesRight) =
createIndexFilterDataFrame(
rightOperation,
indexTableAttributeMap,
indexFilterAttributesLeft,
indexTableToLogicalRelationMapping,
originalFilterAttributes,
- limitLiteral)
+ limitLiteral,
+ sortNode = sortNode)
// create new data frame by applying join or union based on nodeType
val newDFAfterUnionOrJoin = applyUnionOrJoinOnDataFrames(nodeType,
@@ -399,15 +441,16 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
allIndexTablesDF
}
- private def removeIsNotNullAttribute(condition: Expression): Expression = {
+ private def removeIsNotNullAttribute(condition: Expression,
+ pushDownNotNullFilter: Boolean): Expression = {
val isPartialStringEnabled = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.ENABLE_SI_LOOKUP_PARTIALSTRING,
CarbonCommonConstants.ENABLE_SI_LOOKUP_PARTIALSTRING_DEFAULT)
.equalsIgnoreCase("true")
condition transform {
- case IsNotNull(child: AttributeReference) => Literal(true)
// Like is possible only if user provides _ in between the string
// _ in like means any single character wild card check.
+ case IsNotNull(child: AttributeReference) =>
Literal(!pushDownNotNullFilter)
case plan if (CarbonHiveIndexMetadataUtil.checkNIUDF(plan)) =>
Literal(true)
case Like(left: AttributeReference, right: Literal) if
(!isPartialStringEnabled) => Literal(
true)
@@ -456,7 +499,7 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
*/
private def isConditionColumnInIndexTable(condition: Expression,
indexTableColumnsToTableMapping: mutable.Map[String, Set[String]],
- pushDownRequired: Boolean): Option[String] = {
+ pushDownRequired: Boolean, pushDownNotNullFilter: Boolean):
Option[String] = {
// In case of Like Filter in OR, both the conditions should not be
transformed
// In case of like filter in And, only like filter should be removed and
// other filter should be transformed with index table
@@ -468,10 +511,10 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
var tableName: Option[String] = None
val doNotPushToSI = condition match {
+ case IsNotNull(child: AttributeReference) => !pushDownNotNullFilter
case Not(EqualTo(left: AttributeReference, right: Literal)) => true
case Not(Like(left: AttributeReference, right: Literal)) => true
case Not(In(left: AttributeReference, right: Seq[Expression])) => true
- case IsNotNull(child: AttributeReference) => true
case Like(left: AttributeReference, right: Literal) if
(!pushDownRequired) => true
case EndsWith(left: AttributeReference, right: Literal) if
(!pushDownRequired) => true
case Contains(left: AttributeReference, right: Literal) if
(!pushDownRequired) => true
@@ -514,7 +557,8 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
*/
private def createIndexTableFilterCondition(filterTree:
SIFilterPushDownOperation,
condition: Expression,
- indexTableToColumnsMapping: mutable.Map[String, Set[String]]):
+ indexTableToColumnsMapping: mutable.Map[String, Set[String]],
+ pushDownNotNullFilter: Boolean):
(SIFilterPushDownOperation, Expression, Option[String]) = {
condition match {
case or@Or(left, right) =>
@@ -522,12 +566,12 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
createIndexTableFilterCondition(
filterTree,
left,
- indexTableToColumnsMapping)
+ indexTableToColumnsMapping, pushDownNotNullFilter)
val (newSIFilterTreeRight, newRight, tableNameRight) =
createIndexTableFilterCondition(
filterTree,
right,
- indexTableToColumnsMapping)
+ indexTableToColumnsMapping, pushDownNotNullFilter)
(tableNameLeft, tableNameRight) match {
case (Some(tableLeft), Some(tableRight)) =>
@@ -565,12 +609,14 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
createIndexTableFilterCondition(
filterTree,
left,
- indexTableToColumnsMapping)
+ indexTableToColumnsMapping,
+ pushDownNotNullFilter)
val (newSIFilterTreeRight, newRight, tableNameRight) =
createIndexTableFilterCondition(
filterTree,
right,
- indexTableToColumnsMapping)
+ indexTableToColumnsMapping,
+ pushDownNotNullFilter)
(tableNameLeft, tableNameRight) match {
case (Some(tableLeft), Some(tableRight)) =>
// push down both left and right condition if both left and right
columns have index
@@ -605,7 +651,7 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
}
val tableName = isConditionColumnInIndexTable(condition,
indexTableToColumnsMapping,
- isPartialStringEnabled)
+ isPartialStringEnabled, pushDownNotNullFilter =
pushDownNotNullFilter)
// create a node if condition can be pushed down else return the same
filterTree
val newFilterTree = tableName match {
case Some(table) =>
@@ -696,21 +742,71 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
.getBoolean("spark.carbon.pushdown.join.as.filter", defaultValue = true)
val transformChild = false
var addProjection = needProjection
+ // to store the sort node per query
+ var sortNodeForPushDown: Sort = null
+ // to store the limit literal per query
+ var limitLiteral: Literal = null
+ // by default do not push down notNull filter,
+ // but for orderby limit push down, push down notNull filter also. Else we
get wrong results.
+ var pushDownNotNullFilter: Boolean = false
val transformedPlan = transformPlan(plan, {
- case union@Union(children) =>
+ case union@Union(_) =>
// In case of Union, Extra Project has to be added to the Plan.
Because if left table is
// pushed to SI and right table is not pushed, then Output Attribute
mismatch will happen
addProjection = true
(union, true)
- case sort@Sort(order, global, plan) =>
+ case sort@Sort(_, _, _) =>
addProjection = true
(sort, true)
- case filter@Filter(condition,
logicalRelation@MatchIndexableRelation(indexableRelation))
+ case limit@Limit(literal: Literal, sort@Sort(_, _, child)) =>
+ child match {
+ case filter: Filter =>
+ if (checkIfPushDownOrderByLimitAndNotNullFilter(literal, sort,
filter)) {
+ sortNodeForPushDown = sort
+ limitLiteral = literal
+ pushDownNotNullFilter = true
+ }
+ case p: Project if p.child.isInstanceOf[Filter] =>
+ if (checkIfPushDownOrderByLimitAndNotNullFilter(literal,
+ sort,
+ p.child.asInstanceOf[Filter])) {
+ sortNodeForPushDown = sort
+ limitLiteral = literal
+ pushDownNotNullFilter = true
+ }
+ case _ =>
+ }
+ (limit, transformChild)
+ case limit@Limit(literal: Literal, _@Project(_, sort@Sort(_, _, child)))
=>
+ child match {
+ case f: Filter =>
+ if (checkIfPushDownOrderByLimitAndNotNullFilter(literal, sort, f))
{
+ sortNodeForPushDown = sort
+ limitLiteral = literal
+ pushDownNotNullFilter = true
+ }
+ case p: Project if (p.child.isInstanceOf[Filter]) =>
+ if (checkIfPushDownOrderByLimitAndNotNullFilter(literal,
+ sort,
+ p.child.asInstanceOf[Filter])) {
+ sortNodeForPushDown = sort
+ limitLiteral = literal
+ pushDownNotNullFilter = true
+ }
+ case _ =>
+ }
+ (limit, transformChild)
+ case filter@Filter(condition,
_@MatchIndexableRelation(indexableRelation))
if !condition.isInstanceOf[IsNotNull] &&
CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
- val reWrittenPlan = rewritePlanForSecondaryIndex(filter,
indexableRelation,
+ val reWrittenPlan = rewritePlanForSecondaryIndex(
+ filter,
+ indexableRelation,
filter.child.asInstanceOf[LogicalRelation].relation
-
.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName)
+
.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName,
+ limitLiteral = limitLiteral,
+ sortNodeForPushDown = sortNodeForPushDown,
+ pushDownNotNullFilter = pushDownNotNullFilter)
if (reWrittenPlan.isInstanceOf[Join]) {
if (pushDownJoinEnabled && !addProjection) {
(reWrittenPlan, transformChild)
@@ -721,12 +817,18 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
(filter, transformChild)
}
case projection@Project(cols, filter@Filter(condition,
- logicalRelation@MatchIndexableRelation(indexableRelation)))
+ _@MatchIndexableRelation(indexableRelation)))
if !condition.isInstanceOf[IsNotNull] &&
CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
- val reWrittenPlan = rewritePlanForSecondaryIndex(filter,
indexableRelation,
+ val reWrittenPlan = rewritePlanForSecondaryIndex(
+ filter,
+ indexableRelation,
filter.child.asInstanceOf[LogicalRelation].relation
-
.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName, cols)
+
.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName,
+ cols,
+ limitLiteral = limitLiteral,
+ sortNodeForPushDown = sortNodeForPushDown,
+ pushDownNotNullFilter = pushDownNotNullFilter)
// If Index table is matched, join plan will be returned.
// Adding projection over join to return only selected columns from
query.
// Else all columns from left & right table will be returned in output
columns
@@ -744,7 +846,7 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
// last index table, as number of records returned after join where
unique and it will
// definitely return at least 1 record.
case limit@Limit(literal: Literal,
- filter@Filter(condition,
logicalRelation@MatchIndexableRelation(indexableRelation)))
+ filter@Filter(condition, _@MatchIndexableRelation(indexableRelation)))
if !condition.isInstanceOf[IsNotNull] &&
CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
val carbonRelation =
filter.child.asInstanceOf[LogicalRelation].relation
@@ -771,7 +873,7 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
(limit, transformChild)
}
case limit@Limit(literal: Literal, projection@Project(cols,
filter@Filter(condition,
- logicalRelation@MatchIndexableRelation(indexableRelation))))
+ _@MatchIndexableRelation(indexableRelation))))
if !condition.isInstanceOf[IsNotNull] &&
CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
val carbonRelation =
filter.child.asInstanceOf[LogicalRelation].relation
@@ -836,6 +938,40 @@ class CarbonSecondaryIndexOptimizer(sparkSession:
SparkSession) {
}
}
+ private def checkIfPushDownOrderByLimitAndNotNullFilter(literal: Literal,
sort: Sort,
+ filter: Filter): Boolean = {
+ val filterAttributes = filter.condition collect {
+ case attr: AttributeReference => attr.name.toLowerCase
+ }
+ val parentTableRelation = MatchIndexableRelation.unapply(filter.child).get
+ val matchingIndexTables = CarbonCostBasedOptimizer.identifyRequiredTables(
+ filterAttributes.toSet.asJava,
+
CarbonIndexUtil.getSecondaryIndexes(parentTableRelation).mapValues(_.toList.asJava).asJava)
+ .asScala
+ val databaseName = parentTableRelation.carbonRelation.databaseName
+ // filter out all the index tables which are disabled
+ val enabledMatchingIndexTables = matchingIndexTables
+ .filter(table => {
+ sparkSession.sessionState.catalog
+ .getTableMetadata(TableIdentifier(table,
+ Some(databaseName))).storage
+ .properties
+ .getOrElse("isSITableEnabled", "true").equalsIgnoreCase("true")
+ })
+ // 1. check if only one SI matches for the filter columns
+ if (enabledMatchingIndexTables.nonEmpty && enabledMatchingIndexTables.size
== 1) {
+ // 2. check if all the sort columns is in SI
+ val sortColumns = sort
+ .order
+ .map(_.child.asInstanceOf[AttributeReference].name.toLowerCase())
+ .toSet
+ val indexCarbonTable = CarbonEnv
+ .getCarbonTable(Some(databaseName),
enabledMatchingIndexTables.head)(sparkSession)
+ return sortColumns.forall { x => indexCarbonTable.getColumnByName(x) !=
null }
+ }
+ false
+ }
+
}
object MatchIndexableRelation {