This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f1d2a4  [CARBONDATA-4317] Fix TPCDS performance issues
0f1d2a4 is described below

commit 0f1d2a45e5f614fd123bd734ab37d7e453c21344
Author: Indhumathi27 <[email protected]>
AuthorDate: Tue Dec 7 20:32:05 2021 +0530

    [CARBONDATA-4317] Fix TPCDS performance issues
    
    Why is this PR needed?
    The following issues has degraded the TPCDS query performance
    1. If dynamic filters is not present in partitionFilters Set, then that 
filter is skipped, to pushdown to spark.
    2. In some cases, some nodes like Exchange / Shuffle is not reused, because 
the CarbonDataSourceSCan plan is not mached
    3. While accessing the metadata on the canonicalized plan throws NPE
    
    What changes were proposed in this PR?
    1. Check if dynamic filters is present in PartitionFilters set. If not, 
pushdown the filter
    2. Match the plans, by converting them to canonicalized and by normalising 
the expressions
    3. Move variables used in metadata(), to avoid NPE while comparing plans
    
    This closes #4241
---
 .../execution/strategy/CarbonDataSourceScan.scala  | 49 ++++++++++++++++++----
 .../execution/strategy/CarbonSourceStrategy.scala  |  6 +--
 .../apache/spark/sql/CarbonToSparkAdapter.scala    |  4 +-
 .../apache/spark/sql/CarbonToSparkAdapter.scala    |  4 +-
 .../apache/spark/sql/CarbonToSparkAdapter.scala    |  7 +++-
 5 files changed, 54 insertions(+), 16 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
index 2e1bb96..31685b0 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -31,7 +31,6 @@ import 
org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
 import org.apache.spark.sql.execution.WholeStageCodegenExec
 
 import org.apache.carbondata.core.metadata.schema.BucketingInfo
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.hadoop.CarbonProjection
 
@@ -44,7 +43,6 @@ case class CarbonDataSourceScan(
     output: Seq[Attribute],
     partitionFiltersWithoutDpp: Seq[SparkExpression],
     dataFilters: Seq[SparkExpression],
-    @transient readCommittedScope: ReadCommittedScope,
     @transient pushedDownProjection: CarbonProjection,
     @transient pushedDownFilters: Seq[Expression],
     directScanSupport: Boolean,
@@ -64,6 +62,10 @@ case class CarbonDataSourceScan(
     partitionFiltersWithDpp,
     segmentIds) {
 
+  val pushDownFiltersStr: String = 
seqToString(pushedDownFilters.map(_.getStatement))
+
+  val projectionColStr: String = 
seqToString(pushedDownProjection.getAllColumns)
+
   override lazy val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
     val info: BucketingInfo = relation.carbonTable.getBucketingInfo
     if (info != null) {
@@ -91,15 +93,18 @@ case class CarbonDataSourceScan(
     }
   }
 
+  def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")
+
   override lazy val metadata: Map[String, String] = {
-    def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
     val metadata =
       Map(
-        "ReadSchema" -> seqToString(pushedDownProjection.getAllColumns),
+        "ReadSchema" -> projectionColStr,
         "Batched" -> supportsBatchOrColumnar.toString,
         "DirectScan" -> (supportsBatchOrColumnar && 
directScanSupport).toString,
-        "PushedFilters" -> seqToString(pushedDownFilters.map(_.getStatement)))
-    if (relation.carbonTable.isHivePartitionTable) {
+        "PushedFilters" -> pushDownFiltersStr)
+    // if plan is canonicalized, then filter expressions will be normalized. 
In that case,
+    // skip adding selected partitions to metadata
+    if (!this.isCanonicalizedPlan && 
relation.carbonTable.isHivePartitionTable) {
       metadata + ("PartitionFilters" -> seqToString(partitionFiltersWithDpp)) +
         ("PartitionCount" -> selectedPartitions.size.toString)
     } else {
@@ -142,14 +147,40 @@ case class CarbonDataSourceScan(
       outputAttibutesAfterNormalizingExpressionIds,
       QueryPlan.normalizePredicates(partitionFiltersWithoutDpp, output),
       QueryPlan.normalizePredicates(dataFilters, output),
-      null,
-      null,
+      pushedDownProjection,
       Seq.empty,
       directScanSupport,
       extraRDD,
       tableIdentifier,
-      selectedCatalogPartitions,
+      Seq.empty,
       QueryPlan.normalizePredicates(partitionFiltersWithDpp, output)
     )
   }
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case scan: CarbonDataSourceScan =>
+        if (scan.relation == relation) {
+          var currentPlan = this
+          var otherPlan = scan
+          // In some cases, the plans for comparison is not canonicalized. In 
that case, comparing
+          // pushedDownFilters will not match, since objects are different. Do 
canonicalize
+          // the plans before comparison, which can reuse exchange for better 
performance
+          if (pushedDownFilters.nonEmpty && scan.pushedDownFilters.nonEmpty) {
+            otherPlan = scan.canonicalized.asInstanceOf[CarbonDataSourceScan]
+            currentPlan = this.canonicalized.asInstanceOf[CarbonDataSourceScan]
+          }
+          // compare metadata, partition filter and data filter expressions
+          currentPlan.metadata == otherPlan.metadata &&
+          currentPlan.partitionFiltersWithDpp.toList.asJava
+            .containsAll(otherPlan.partitionFiltersWithDpp.toList.asJava) &&
+          (currentPlan.dataFilters == otherPlan.dataFilters ||
+           QueryPlan.normalizePredicates(currentPlan.dataFilters, 
currentPlan.output)
+           == QueryPlan.normalizePredicates(otherPlan.dataFilters, 
otherPlan.output))
+        } else {
+          false
+        }
+      case _ => false
+    }
+  }
 }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
index 8a0779f..824c7fb 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
@@ -158,9 +158,10 @@ private[sql] object CarbonSourceStrategy extends 
SparkStrategy {
         SparkSession.getActiveSession.get,
         relation.catalogTable.get.identifier
       )
-      // remove dynamic partition filter from predicates
-      filterPredicates = CarbonToSparkAdapter.getDataFilter(partitionSet, 
allPredicates)
     }
+    // remove dynamic partition filter from predicates
+    filterPredicates = CarbonToSparkAdapter.getDataFilter(partitionSet,
+      allPredicates, partitionsFilter)
     val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
     val projects = rawProjects.map {p =>
       p.transform {
@@ -232,7 +233,6 @@ private[sql] object CarbonSourceStrategy extends 
SparkStrategy {
       output,
       partitionsFilter.filterNot(SubqueryExpression.hasSubquery),
       handledPredicates,
-      readCommittedScope,
       getCarbonProjection(relationPredicates, requiredColumns, projects),
       pushedFilters,
       directScanSupport,
diff --git 
a/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
 
b/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
index 81ff362..2c3483b 100644
--- 
a/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ 
b/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -173,7 +173,9 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
       }
   }
 
-  def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression]): 
Seq[Expression] = {
+  def getDataFilter(partitionSet: AttributeSet,
+      filter: Seq[Expression],
+      partitionFilter: Seq[Expression]): Seq[Expression] = {
     filter
   }
 
diff --git 
a/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
 
b/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
index ec2352d..89212a6 100644
--- 
a/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ 
b/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -207,7 +207,9 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
       }
   }
 
-  def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression]): 
Seq[Expression] = {
+  def getDataFilter(partitionSet: AttributeSet,
+      filter: Seq[Expression],
+      partitionFilter: Seq[Expression]): Seq[Expression] = {
     filter
   }
 
diff --git 
a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
 
b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
index eaceb85..9ab7888 100644
--- 
a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ 
b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -180,9 +180,12 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
       }
   }
 
-  def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression]): 
Seq[Expression] = {
+  def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression],
+      partitionFilter: Seq[Expression]): Seq[Expression] = {
     filter.filter {
-      case _: DynamicPruningSubquery => false
+      case dp: DynamicPruningSubquery =>
+        // if filter does not exists in partition filter, then push down the 
filter to spark
+        !partitionFilter.exists(_.semanticEquals(dp))
       case _ => true
     }
   }

Reply via email to