This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 0f1d2a4 [CARBONDATA-4317] Fix TPCDS performance issues
0f1d2a4 is described below
commit 0f1d2a45e5f614fd123bd734ab37d7e453c21344
Author: Indhumathi27 <[email protected]>
AuthorDate: Tue Dec 7 20:32:05 2021 +0530
[CARBONDATA-4317] Fix TPCDS performance issues
Why is this PR needed?
The following issues has degraded the TPCDS query performance
1. If dynamic filters is not present in partitionFilters Set, then that
filter is skipped, to pushdown to spark.
2. In some cases, some nodes like Exchange / Shuffle is not reused, because
the CarbonDataSourceSCan plan is not mached
3. While accessing the metadata on the canonicalized plan throws NPE
What changes were proposed in this PR?
1. Check if dynamic filters is present in PartitionFilters set. If not,
pushdown the filter
2. Match the plans, by converting them to canonicalized and by normalising
the expressions
3. Move variables used in metadata(), to avoid NPE while comparing plans
This closes #4241
---
.../execution/strategy/CarbonDataSourceScan.scala | 49 ++++++++++++++++++----
.../execution/strategy/CarbonSourceStrategy.scala | 6 +--
.../apache/spark/sql/CarbonToSparkAdapter.scala | 4 +-
.../apache/spark/sql/CarbonToSparkAdapter.scala | 4 +-
.../apache/spark/sql/CarbonToSparkAdapter.scala | 7 +++-
5 files changed, 54 insertions(+), 16 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
index 2e1bb96..31685b0 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -31,7 +31,6 @@ import
org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.carbondata.core.metadata.schema.BucketingInfo
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.hadoop.CarbonProjection
@@ -44,7 +43,6 @@ case class CarbonDataSourceScan(
output: Seq[Attribute],
partitionFiltersWithoutDpp: Seq[SparkExpression],
dataFilters: Seq[SparkExpression],
- @transient readCommittedScope: ReadCommittedScope,
@transient pushedDownProjection: CarbonProjection,
@transient pushedDownFilters: Seq[Expression],
directScanSupport: Boolean,
@@ -64,6 +62,10 @@ case class CarbonDataSourceScan(
partitionFiltersWithDpp,
segmentIds) {
+ val pushDownFiltersStr: String =
seqToString(pushedDownFilters.map(_.getStatement))
+
+ val projectionColStr: String =
seqToString(pushedDownProjection.getAllColumns)
+
override lazy val (outputPartitioning, outputOrdering): (Partitioning,
Seq[SortOrder]) = {
val info: BucketingInfo = relation.carbonTable.getBucketingInfo
if (info != null) {
@@ -91,15 +93,18 @@ case class CarbonDataSourceScan(
}
}
+ def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")
+
override lazy val metadata: Map[String, String] = {
- def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
val metadata =
Map(
- "ReadSchema" -> seqToString(pushedDownProjection.getAllColumns),
+ "ReadSchema" -> projectionColStr,
"Batched" -> supportsBatchOrColumnar.toString,
"DirectScan" -> (supportsBatchOrColumnar &&
directScanSupport).toString,
- "PushedFilters" -> seqToString(pushedDownFilters.map(_.getStatement)))
- if (relation.carbonTable.isHivePartitionTable) {
+ "PushedFilters" -> pushDownFiltersStr)
+ // if plan is canonicalized, then filter expressions will be normalized.
In that case,
+ // skip adding selected partitions to metadata
+ if (!this.isCanonicalizedPlan &&
relation.carbonTable.isHivePartitionTable) {
metadata + ("PartitionFilters" -> seqToString(partitionFiltersWithDpp)) +
("PartitionCount" -> selectedPartitions.size.toString)
} else {
@@ -142,14 +147,40 @@ case class CarbonDataSourceScan(
outputAttibutesAfterNormalizingExpressionIds,
QueryPlan.normalizePredicates(partitionFiltersWithoutDpp, output),
QueryPlan.normalizePredicates(dataFilters, output),
- null,
- null,
+ pushedDownProjection,
Seq.empty,
directScanSupport,
extraRDD,
tableIdentifier,
- selectedCatalogPartitions,
+ Seq.empty,
QueryPlan.normalizePredicates(partitionFiltersWithDpp, output)
)
}
+
+ override def equals(other: Any): Boolean = {
+ other match {
+ case scan: CarbonDataSourceScan =>
+ if (scan.relation == relation) {
+ var currentPlan = this
+ var otherPlan = scan
+ // In some cases, the plans for comparison is not canonicalized. In
that case, comparing
+ // pushedDownFilters will not match, since objects are different. Do
canonicalize
+ // the plans before comparison, which can reuse exchange for better
performance
+ if (pushedDownFilters.nonEmpty && scan.pushedDownFilters.nonEmpty) {
+ otherPlan = scan.canonicalized.asInstanceOf[CarbonDataSourceScan]
+ currentPlan = this.canonicalized.asInstanceOf[CarbonDataSourceScan]
+ }
+ // compare metadata, partition filter and data filter expressions
+ currentPlan.metadata == otherPlan.metadata &&
+ currentPlan.partitionFiltersWithDpp.toList.asJava
+ .containsAll(otherPlan.partitionFiltersWithDpp.toList.asJava) &&
+ (currentPlan.dataFilters == otherPlan.dataFilters ||
+ QueryPlan.normalizePredicates(currentPlan.dataFilters,
currentPlan.output)
+ == QueryPlan.normalizePredicates(otherPlan.dataFilters,
otherPlan.output))
+ } else {
+ false
+ }
+ case _ => false
+ }
+ }
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
index 8a0779f..824c7fb 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
@@ -158,9 +158,10 @@ private[sql] object CarbonSourceStrategy extends
SparkStrategy {
SparkSession.getActiveSession.get,
relation.catalogTable.get.identifier
)
- // remove dynamic partition filter from predicates
- filterPredicates = CarbonToSparkAdapter.getDataFilter(partitionSet,
allPredicates)
}
+ // remove dynamic partition filter from predicates
+ filterPredicates = CarbonToSparkAdapter.getDataFilter(partitionSet,
+ allPredicates, partitionsFilter)
val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
val projects = rawProjects.map {p =>
p.transform {
@@ -232,7 +233,6 @@ private[sql] object CarbonSourceStrategy extends
SparkStrategy {
output,
partitionsFilter.filterNot(SubqueryExpression.hasSubquery),
handledPredicates,
- readCommittedScope,
getCarbonProjection(relationPredicates, requiredColumns, projects),
pushedFilters,
directScanSupport,
diff --git
a/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
b/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
index 81ff362..2c3483b 100644
---
a/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++
b/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -173,7 +173,9 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
}
}
- def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression]):
Seq[Expression] = {
+ def getDataFilter(partitionSet: AttributeSet,
+ filter: Seq[Expression],
+ partitionFilter: Seq[Expression]): Seq[Expression] = {
filter
}
diff --git
a/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
b/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
index ec2352d..89212a6 100644
---
a/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++
b/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -207,7 +207,9 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
}
}
- def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression]):
Seq[Expression] = {
+ def getDataFilter(partitionSet: AttributeSet,
+ filter: Seq[Expression],
+ partitionFilter: Seq[Expression]): Seq[Expression] = {
filter
}
diff --git
a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
index eaceb85..9ab7888 100644
---
a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++
b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -180,9 +180,12 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
}
}
- def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression]):
Seq[Expression] = {
+ def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression],
+ partitionFilter: Seq[Expression]): Seq[Expression] = {
filter.filter {
- case _: DynamicPruningSubquery => false
+ case dp: DynamicPruningSubquery =>
+ // if filter does not exists in partition filter, then push down the
filter to spark
+ !partitionFilter.exists(_.semanticEquals(dp))
case _ => true
}
}