This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new bdc9484 [CARBONDATA-4271] Support DPP for carbon
bdc9484 is described below
commit bdc9484ac8455e8f53e86367c0e5104364799068
Author: Indhumathi27 <[email protected]>
AuthorDate: Tue Jul 13 17:07:39 2021 +0530
[CARBONDATA-4271] Support DPP for carbon
Why is this PR needed?
This PR enables Dynamic Partition Pruning for carbon.
What changes were proposed in this PR?
CarbonDatasourceHadoopRelation has to extend HadoopFsRelation,
because spark has added a check to use DPP only for relation matching
HadoopFsRelation
Apply Dynamic filter and get runtimePartitions and set this to
CarbonScanRDD for pruning
This closes #4199
---
.../spark/sql/CarbonDataSourceScanHelper.scala | 3 +
.../spark/rdd/CarbonDeltaRowScanRDD.scala | 4 +-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 2 +-
.../spark/sql/CarbonDatasourceHadoopRelation.scala | 49 ++++-
.../scala/org/apache/spark/sql/CarbonSource.scala | 24 ++-
.../CarbonInsertIntoHadoopFsRelationCommand.scala | 2 -
.../table/CarbonCreateTableAsSelectCommand.scala | 2 +-
.../execution/strategy/CarbonDataSourceScan.scala | 26 ++-
.../sql/execution/strategy/CarbonPlanHelper.scala | 44 ++--
.../execution/strategy/CarbonSourceStrategy.scala | 44 ++--
.../spark/sql/execution/strategy/DMLStrategy.scala | 5 +-
.../spark/sql/hive/CarbonFileMetastore.scala | 2 +-
.../apache/spark/sql/optimizer/CarbonFilters.scala | 6 +-
.../spark/sql/CarbonDataSourceScanHelper.scala | 43 +++-
.../apache/spark/sql/CarbonToSparkAdapter.scala | 8 +-
.../dblocation/DBLocationCarbonTableTestCase.scala | 2 +-
.../sql/DynamicPartitionPruningTestCase.scala | 235 +++++++++++++++++++++
17 files changed, 426 insertions(+), 75 deletions(-)
diff --git
a/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
b/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
index 32a2e34..22183de 100644
---
a/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
+++
b/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
import org.apache.spark.CarbonInputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression =>
SparkExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.{ColumnarBatchScan, DataSourceScanExec}
@@ -40,6 +41,8 @@ abstract class CarbonDataSourceScanHelper(relation:
CarbonDatasourceHadoopRelat
pushedDownProjection: CarbonProjection,
directScanSupport: Boolean,
extraRDD: Option[(RDD[InternalRow], Boolean)],
+ selectedCatalogPartitions: Seq[CatalogTablePartition],
+ partitionFilterWithDpp: Seq[SparkExpression],
segmentIds: Option[String])
extends DataSourceScanExec with ColumnarBatchScan {
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
index de6aeb5..949216c 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
@@ -44,7 +44,7 @@ class CarbonDeltaRowScanRDD[T: ClassTag](
@transient private val spark: SparkSession,
@transient private val serializedTableInfo: Array[Byte],
@transient private val tableInfo: TableInfo,
- @transient override val partitionNames: Seq[PartitionSpec],
+ @transient private val newPartitionNames: Seq[PartitionSpec],
override val columnProjection: CarbonProjection,
var filter: IndexFilter,
identifier: AbsoluteTableIdentifier,
@@ -62,7 +62,7 @@ class CarbonDeltaRowScanRDD[T: ClassTag](
serializedTableInfo,
tableInfo,
inputMetricsStats,
- partitionNames,
+ newPartitionNames,
dataTypeConverterClz,
readSupportClz) {
override def internalGetPartitions: Array[Partition] = {
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index fcb379c..90f1f8b 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -83,7 +83,7 @@ class CarbonScanRDD[T: ClassTag](
@transient private val serializedTableInfo: Array[Byte],
@transient private val tableInfo: TableInfo,
inputMetricsStats: InitInputMetrics,
- @transient val partitionNames: Seq[PartitionSpec],
+ @transient var partitionNames: Seq[PartitionSpec],
val dataTypeConverterClz: Class[_ <: DataTypeConverter] =
classOf[SparkDataTypeConverterImpl],
val readSupportClz: Class[_ <: CarbonReadSupport[_]] =
SparkReadSupport.readSupportClass,
@transient var splits: java.util.List[InputSplit] = null,
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 0d60144..73f8f1c 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -17,21 +17,28 @@
package org.apache.spark.sql
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
SparkCarbonTableFormat}
import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-case class CarbonDatasourceHadoopRelation(
- sparkSession: SparkSession,
- paths: Array[String],
- parameters: Map[String, String],
- tableSchema: Option[StructType],
- limit: Int = -1)
- extends BaseRelation {
+class CarbonDatasourceHadoopRelation(
+ override val sparkSession: SparkSession,
+ val paths: Array[String],
+ val parameters: Map[String, String],
+ val tableSchema: Option[StructType],
+ partitionSchema: StructType = new StructType())
+ extends HadoopFsRelation(null,
+ partitionSchema,
+ new StructType(),
+ None,
+ new SparkCarbonTableFormat,
+ Map.empty)(
+ sparkSession) {
val caseInsensitiveMap: Map[String, String] = parameters.map(f =>
(f._1.toLowerCase, f._2))
lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
@@ -47,9 +54,11 @@ case class CarbonDatasourceHadoopRelation(
@transient lazy val carbonTable: CarbonTable = carbonRelation.carbonTable
+ var limit: Int = -1
+
override def sqlContext: SQLContext = sparkSession.sqlContext
- override def schema: StructType =
tableSchema.getOrElse(carbonRelation.schema)
+ override val schema: StructType =
tableSchema.getOrElse(carbonRelation.schema)
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new
Array[Filter](0)
@@ -57,5 +66,27 @@ case class CarbonDatasourceHadoopRelation(
"CarbonDatasourceHadoopRelation"
}
+ override def equals(other: Any): Boolean = {
+ other match {
+ case relation: CarbonDatasourceHadoopRelation =>
+ relation.carbonRelation == carbonRelation && (relation.paths
sameElements this.paths) &&
+ relation.tableSchema == tableSchema && relation.parameters ==
this.parameters &&
+ relation.partitionSchema == this.partitionSchema
+ case _ => false
+ }
+ }
+
override def sizeInBytes: Long = carbonRelation.sizeInBytes
+
+ def getTableSchema: Option[StructType] = {
+ tableSchema
+ }
+
+ def getLimit: Int = {
+ limit
+ }
+
+ def setLimit(newLimit: Int): Unit = {
+ this.limit = newLimit
+ }
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 90c7339..864ba04 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -39,7 +39,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
TableInfo}
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.spark.CarbonOption
@@ -63,7 +63,7 @@ class CarbonSource extends CreatableRelationProvider with
RelationProvider
// Otherwise create datasource relation
val newParameters = CarbonScalaUtil.getDeserializedParameters(parameters)
newParameters.get("tablePath") match {
- case Some(path) =>
CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
+ case Some(path) => new
CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
Array(path),
newParameters,
None)
@@ -71,7 +71,7 @@ class CarbonSource extends CreatableRelationProvider with
RelationProvider
val options = new CarbonOption(newParameters)
val tablePath =
CarbonEnv.getTablePath(options.dbName,
options.tableName)(sqlContext.sparkSession)
- CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
+ new CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
Array(tablePath),
newParameters,
None)
@@ -136,9 +136,21 @@ class CarbonSource extends CreatableRelationProvider with
RelationProvider
"Table creation failed. Table name cannot contain blank space")
}
val path = getPathForTable(sqlContext.sparkSession, dbName, tableName,
newParameters)
-
- CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path),
newParameters,
- Option(dataSchema))
+ var carbonTable: CarbonTable = null
+ try {
+ carbonTable = CarbonEnv.getCarbonTable(Some(dbName),
tableName)(sqlContext.sparkSession)
+ } catch {
+ case _: Exception =>
+ }
+ val partitionSchema: StructType =
+ if (null != carbonTable && carbonTable.isHivePartitionTable) {
+
StructType(carbonTable.getPartitionInfo.getColumnSchemaList.asScala.map(field =>
+
dataSchema.fields.find(_.name.equalsIgnoreCase(field.getColumnName))).map(_.get))
+ } else {
+ new StructType()
+ }
+ new CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path),
newParameters,
+ Option(dataSchema), partitionSchema)
}
/**
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala
index eb19440..cfa19db 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala
@@ -198,8 +198,6 @@ case class CarbonInsertIntoHadoopFsRelationCommand(
// refresh cached files in FileIndex
fileIndex.foreach(_.refresh())
- // refresh data cache if table is cached
- sparkSession.catalog.refreshByPath(outputPath.toString)
if (catalogTable.nonEmpty) {
CommandUtils.updateTableStats(sparkSession, catalogTable.get)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
index c9748c3..2c6bbfb 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
@@ -81,7 +81,7 @@ case class CarbonCreateTableAsSelectCommand(
tableName = carbonDataSourceHadoopRelation.carbonRelation.tableName,
options = scala.collection.immutable
.Map("fileheader" ->
-
carbonDataSourceHadoopRelation.tableSchema.get.fields.map(_.name).mkString(",")),
+
carbonDataSourceHadoopRelation.getTableSchema.get.fields.map(_.name).mkString(",")),
isOverwriteTable = false,
logicalPlan = query,
tableInfo = tableInfo)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
index 6dc2df1..fd1336f 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation,
CarbonDataSourceScanHelper}
import
org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, SortOrder, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Expression =>
SparkExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -41,7 +42,7 @@ import org.apache.carbondata.hadoop.CarbonProjection
case class CarbonDataSourceScan(
@transient relation: CarbonDatasourceHadoopRelation,
output: Seq[Attribute],
- partitionFilters: Seq[SparkExpression],
+ partitionFiltersWithoutDpp: Seq[SparkExpression],
dataFilters: Seq[SparkExpression],
@transient readCommittedScope: ReadCommittedScope,
@transient pushedDownProjection: CarbonProjection,
@@ -49,9 +50,19 @@ case class CarbonDataSourceScan(
directScanSupport: Boolean,
@transient extraRDD: Option[(RDD[InternalRow], Boolean)] = None,
tableIdentifier: Option[TableIdentifier] = None,
+ @transient selectedCatalogPartitions : Seq[CatalogTablePartition] =
Seq.empty,
+ @transient partitionFiltersWithDpp: Seq[SparkExpression],
segmentIds: Option[String] = None)
- extends CarbonDataSourceScanHelper(relation, output, partitionFilters,
pushedDownFilters,
- pushedDownProjection, directScanSupport, extraRDD, segmentIds) {
+ extends CarbonDataSourceScanHelper(relation,
+ output,
+ partitionFiltersWithoutDpp,
+ pushedDownFilters,
+ pushedDownProjection,
+ directScanSupport,
+ extraRDD,
+ selectedCatalogPartitions,
+ partitionFiltersWithDpp,
+ segmentIds) {
override lazy val (outputPartitioning, outputOrdering): (Partitioning,
Seq[SortOrder]) = {
val info: BucketingInfo = relation.carbonTable.getBucketingInfo
@@ -89,7 +100,7 @@ case class CarbonDataSourceScan(
"DirectScan" -> (supportsBatchOrColumnar &&
directScanSupport).toString,
"PushedFilters" -> seqToString(pushedDownFilters.map(_.getStatement)))
if (relation.carbonTable.isHivePartitionTable) {
- metadata + ("PartitionFilters" -> seqToString(partitionFilters)) +
+ metadata + ("PartitionFilters" -> seqToString(partitionFiltersWithDpp)) +
("PartitionCount" -> selectedPartitions.size.toString)
} else {
metadata
@@ -129,13 +140,16 @@ case class CarbonDataSourceScan(
CarbonDataSourceScan(
relation,
outputAttibutesAfterNormalizingExpressionIds,
- QueryPlan.normalizePredicates(partitionFilters, output),
+ QueryPlan.normalizePredicates(partitionFiltersWithoutDpp, output),
QueryPlan.normalizePredicates(dataFilters, output),
null,
null,
null,
directScanSupport,
extraRDD,
- tableIdentifier)
+ tableIdentifier,
+ selectedCatalogPartitions,
+ QueryPlan.normalizePredicates(partitionFiltersWithDpp, output)
+ )
}
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala
index 00a3398..8c7e825 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter,
CustomDeterministi
import
org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
Expression, NamedExpression, Rand}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.{ExecutedCommandExec,
RunnableCommand}
@@ -45,7 +45,7 @@ object CarbonPlanHelper {
databaseNameOp = Some(insertInto.table.carbonRelation.databaseName),
tableName = insertInto.table.carbonRelation.tableName,
options = scala.collection.immutable
- .Map("fileheader" ->
insertInto.table.tableSchema.get.fields.map(_.name).mkString(",")),
+ .Map("fileheader" ->
insertInto.table.getTableSchema.get.fields.map(_.name).mkString(",")),
isOverwriteTable = insertInto.overwrite,
logicalPlan = insertInto.child,
tableInfo = insertInto.table.carbonRelation.carbonTable.getTableInfo,
@@ -202,12 +202,20 @@ object CarbonPlanHelper {
p.transformAllExpressions {
case a@Alias(exp, _)
if !exp.deterministic &&
!exp.isInstanceOf[CustomDeterministicExpression] =>
- CarbonToSparkAdapter.createAliasRef(
- CustomDeterministicExpression(exp),
- a.name,
- a.exprId,
- a.qualifier,
- a.explicitMetadata)
+ if (SparkUtil.isSparkVersionXAndAbove("3")) {
+ // create custom deterministic expression for Rand function
+ a.transform {
+ case rand: Rand =>
+ CustomDeterministicExpression(rand)
+ }
+ } else {
+ CarbonToSparkAdapter.createAliasRef(
+ CustomDeterministicExpression(exp),
+ a.name,
+ a.exprId,
+ a.qualifier,
+ a.explicitMetadata)
+ }
case exp: NamedExpression
if !exp.deterministic &&
!exp.isInstanceOf[CustomDeterministicExpression] =>
makeDeterministicExp(exp)
@@ -220,12 +228,20 @@ object CarbonPlanHelper {
f.transformAllExpressions {
case a@Alias(exp, _)
if !exp.deterministic &&
!exp.isInstanceOf[CustomDeterministicExpression] =>
- CarbonToSparkAdapter.createAliasRef(
- CustomDeterministicExpression(exp),
- a.name,
- a.exprId,
- a.qualifier,
- a.explicitMetadata)
+ if (SparkUtil.isSparkVersionXAndAbove("3")) {
+ // create custom deterministic expression for Rand function
+ a.transform {
+ case rand: Rand =>
+ CustomDeterministicExpression(rand)
+ }
+ } else {
+ CarbonToSparkAdapter.createAliasRef(
+ CustomDeterministicExpression(exp),
+ a.name,
+ a.exprId,
+ a.qualifier,
+ a.explicitMetadata)
+ }
case exp: NamedExpression
if !exp.deterministic &&
!exp.isInstanceOf[CustomDeterministicExpression] =>
makeDeterministicExp(exp)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
index cdd8183..ed0da08 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
@@ -23,6 +23,7 @@ import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{expressions, InternalRow,
TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, _}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
@@ -35,6 +36,7 @@ import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.scan.expression.{Expression => CarbonFilter}
import org.apache.carbondata.core.util.CarbonProperties
@@ -120,18 +122,17 @@ private[sql] object CarbonSourceStrategy extends
SparkStrategy {
}
private def getPartitionFilter(relation: LogicalRelation,
- filterPredicates: Seq[Expression]): Seq[Expression] = {
- val names =
relation.catalogTable.map(_.partitionColumnNames).getOrElse(Seq.empty)
+ filterPredicates: Seq[Expression], names: Seq[String]):
(Seq[Expression], AttributeSet) = {
// Get the current partitions from table.
if (names.nonEmpty) {
val partitionSet = AttributeSet(names
.map(p => relation.output.find(_.name.equalsIgnoreCase(p)).get))
// Update the name with lower case as it is case sensitive while getting
partition info.
- CarbonToSparkAdapter
+ (CarbonToSparkAdapter
.getPartitionFilter(partitionSet, filterPredicates)
- .map(CarbonToSparkAdapter.lowerCaseAttribute)
+ .map(CarbonToSparkAdapter.lowerCaseAttribute), partitionSet)
} else {
- Seq.empty
+ (Seq.empty, AttributeSet.empty)
}
}
@@ -143,7 +144,22 @@ private[sql] object CarbonSourceStrategy extends
SparkStrategy {
relation: LogicalRelation,
rawProjects: Seq[NamedExpression],
allPredicates: Seq[Expression]): SparkPlan = {
- val partitionsFilter = getPartitionFilter(relation, allPredicates)
+ // get partition column names
+ val names =
relation.catalogTable.map(_.partitionColumnNames).getOrElse(Seq.empty)
+ // get partition set from filter expression
+ val (partitionsFilter, partitionSet) = getPartitionFilter(relation,
allPredicates, names)
+ var partitions : (Seq[CatalogTablePartition], Seq[PartitionSpec],
Seq[Expression]) =
+ (null, null, Seq.empty)
+ var filterPredicates = allPredicates
+ if(names.nonEmpty) {
+ partitions = CarbonFilters.getCatalogTablePartitions(
+ partitionsFilter.filterNot(e =>
e.find(_.isInstanceOf[PlanExpression[_]]).isDefined),
+ SparkSession.getActiveSession.get,
+ relation.catalogTable.get.identifier
+ )
+ // remove dynamic partition filter from predicates
+ filterPredicates = CarbonToSparkAdapter.getDataFilter(partitionSet,
allPredicates)
+ }
val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
val projects = rawProjects.map {p =>
p.transform {
@@ -154,9 +170,9 @@ private[sql] object CarbonSourceStrategy extends
SparkStrategy {
// contains the original order of the projection requested
val projectsAttr = projects.flatMap(_.references)
val projectSet = AttributeSet(projectsAttr)
- val filterSet = AttributeSet(allPredicates.flatMap(_.references))
+ val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
- val relationPredicates = allPredicates.map {
+ val relationPredicates = filterPredicates.map {
_ transform {
case a: AttributeReference => relation.attributeMap(a) // Match
original case of attributes.
}
@@ -168,7 +184,7 @@ private[sql] object CarbonSourceStrategy extends
SparkStrategy {
// A set of column attributes that are only referenced by pushed down
filters. We can eliminate
// them from requested columns.
val handledSet = {
- val handledPredicates =
allPredicates.filterNot(unhandledPredicates.contains)
+ val handledPredicates =
filterPredicates.filterNot(unhandledPredicates.contains)
val unhandledSet =
AttributeSet(unhandledPredicates.flatMap(_.references))
try {
AttributeSet(handledPredicates.flatMap(_.references)) --
@@ -184,7 +200,7 @@ private[sql] object CarbonSourceStrategy extends
SparkStrategy {
val readCommittedScope =
new TableStatusReadCommittedScope(table.identifier,
FileFactory.getConfiguration)
val extraSegments = MixedFormatHandler.extraSegments(table.identifier,
readCommittedScope)
- val extraRDD = MixedFormatHandler.extraRDD(relation, rawProjects,
allPredicates,
+ val extraRDD = MixedFormatHandler.extraRDD(relation, rawProjects,
filterPredicates,
readCommittedScope, table.identifier, extraSegments)
val vectorPushRowFilters =
vectorPushRowFiltersEnabled(relationPredicates, extraSegments.nonEmpty)
@@ -213,7 +229,7 @@ private[sql] object CarbonSourceStrategy extends
SparkStrategy {
val scan = CarbonDataSourceScan(
table,
output,
- partitionsFilter,
+ partitionsFilter.filterNot(SubqueryExpression.hasSubquery),
handledPredicates,
readCommittedScope,
getCarbonProjection(relationPredicates, requiredColumns, projects),
@@ -222,14 +238,16 @@ private[sql] object CarbonSourceStrategy extends
SparkStrategy {
extraRDD,
Some(TableIdentifier(table.identifier.getTableName,
Option(table.identifier.getDatabaseName))),
+ partitions._1,
+ partitionsFilter,
segmentIds
)
// filter
val filterOption = if (directScanSupport && CarbonToSparkAdapter
.supportsBatchOrColumnar(scan)) {
- allPredicates.reduceLeftOption(expressions.And)
+ filterPredicates.reduceLeftOption(expressions.And)
} else if (extraSegments.nonEmpty) {
- allPredicates.reduceLeftOption(expressions.And)
+ filterPredicates.reduceLeftOption(expressions.And)
} else {
filterCondition
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
index 8dbfc5a..9236e1e 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
@@ -374,9 +374,10 @@ object DMLStrategy extends SparkStrategy {
def pushLimit(limit: Int, plan: LogicalPlan): LogicalPlan = {
val newPlan = plan transform {
case lr: LogicalRelation =>
- val newRelation = lr.copy(relation = lr.relation
+ val relation = lr.relation
.asInstanceOf[CarbonDatasourceHadoopRelation]
- .copy(limit = limit))
+ relation.setLimit(limit)
+ val newRelation = lr.copy(relation = relation)
newRelation
case other => other
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index dc32eac..4186405 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -559,7 +559,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
case None =>
CarbonEnv.getTablePath(tableIdentifier.database,
tableIdentifier.table)(sparkSession)
}
- CarbonDatasourceHadoopRelation(sparkSession,
+ new CarbonDatasourceHadoopRelation(sparkSession,
Array(tableLocation.asInstanceOf[String]),
catalogTable.storage.properties,
Option(catalogTable.schema))
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 09bdf18..2dd8c23 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -675,11 +675,11 @@ object CarbonFilters {
if (!carbonTable.isHivePartitionTable) {
return (null, null, partitionFilters)
}
- val partititions = getCatalogTablePartitions(partitionFilters,
sparkSession, carbonTable)
- if (partititions.isEmpty) {
+ val partitions = getCatalogTablePartitions(partitionFilters, sparkSession,
carbonTable)
+ if (partitions.isEmpty) {
(Seq.empty, Seq.empty, partitionFilters)
} else {
- (partititions, convertToPartitionSpec(partititions), partitionFilters)
+ (partitions, convertToPartitionSpec(partitions), partitionFilters)
}
}
}
diff --git
a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
index 8fe14e6..20a8f5b 100644
---
a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
+++
b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
@@ -20,12 +20,14 @@ package org.apache.spark.sql
import org.apache.spark.CarbonInputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression =>
SparkExpression}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition,
ExternalCatalogUtils}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression =>
SparkExpression, PlanExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.execution.{DataSourceScanExec,
WholeStageCodegenExec}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.strategy.CarbonPlanHelper
import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.carbondata.core.index.IndexFilter
@@ -35,14 +37,16 @@ import
org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.hadoop.CarbonProjection
import org.apache.carbondata.spark.rdd.CarbonScanRDD
-abstract class CarbonDataSourceScanHelper(relation:
CarbonDatasourceHadoopRelation,
- output: Seq[Attribute],
- partitionFilters: Seq[SparkExpression],
- pushedDownFilters: Seq[Expression],
- pushedDownProjection: CarbonProjection,
- directScanSupport: Boolean,
- extraRDD: Option[(RDD[InternalRow], Boolean)],
- segmentIds: Option[String])
+abstract class CarbonDataSourceScanHelper(relation:
CarbonDatasourceHadoopRelation,
+ output: Seq[Attribute],
+ partitionFiltersWithoutDpp: Seq[SparkExpression],
+ pushedDownFilters: Seq[Expression],
+ pushedDownProjection: CarbonProjection,
+ directScanSupport: Boolean,
+ extraRDD: Option[(RDD[InternalRow], Boolean)],
+ selectedCatalogPartitions: Seq[CatalogTablePartition],
+ partitionFiltersWithDpp: Seq[SparkExpression],
+ segmentIds: Option[String])
extends DataSourceScanExec {
override lazy val supportsColumnar: Boolean = CarbonPlanHelper
@@ -63,7 +67,7 @@ abstract class CarbonDataSourceScanHelper(relation:
CarbonDatasourceHadoopRelat
.map(new IndexFilter(relation.carbonTable, _, true)).orNull
if (filter != null && pushedDownFilters.length == 1) {
// push down the limit if only one filter
- filter.setLimit(relation.limit)
+ filter.setLimit(relation.getLimit)
}
filter
}
@@ -89,11 +93,13 @@ abstract class CarbonDataSourceScanHelper(relation:
CarbonDatasourceHadoopRelat
@transient lazy val selectedPartitions: Seq[PartitionSpec] = {
CarbonFilters
- .getPartitions(partitionFilters, relation.sparkSession,
relation.carbonTable)
+ .getPartitions(partitionFiltersWithoutDpp, relation.sparkSession,
relation.carbonTable)
.orNull
}
lazy val inputRDD: RDD[InternalRow] = {
+ val dynamicFilter = partitionFiltersWithDpp.filter(exp =>
+ exp.find(_.isInstanceOf[PlanExpression[_]]).isDefined)
val carbonRdd = new CarbonScanRDD[InternalRow](
relation.sparkSession,
pushedDownProjection,
@@ -104,6 +110,21 @@ abstract class CarbonDataSourceScanHelper(relation:
CarbonDatasourceHadoopRelat
new CarbonInputMetrics,
selectedPartitions,
segmentIds = segmentIds)
+ if(dynamicFilter.nonEmpty) {
+ carbonRdd match {
+ case carbonRdd: CarbonScanRDD[InternalRow] =>
+ // prune dynamic partitions based on filter
+ val sparkExpression = SparkSQLUtil.getSparkSession
+ val runtimePartitions = ExternalCatalogUtils.prunePartitionsByFilter(
+
sparkExpression.sessionState.catalog.getTableMetadata(tableIdentifier.get),
+ selectedCatalogPartitions,
+ dynamicFilter,
+ sparkExpression.sessionState.conf.sessionLocalTimeZone
+ )
+ // set partitions to carbon rdd
+ carbonRdd.partitionNames =
CarbonFilters.convertToPartitionSpec(runtimePartitions)
+ }
+ }
carbonRdd.setVectorReaderSupport(supportsColumnar)
carbonRdd.setDirectScanSupport(supportsColumnar && directScanSupport)
extraRDD.map(_._1.union(carbonRdd)).getOrElse(carbonRdd)
diff --git
a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
index 2d60667..25a27ad 100644
---
a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++
b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -26,7 +26,7 @@ import org.apache.spark.scheduler.{SparkListener,
SparkListenerApplicationEnd}
import
org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
ExternalCatalogWithListener}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, AttributeSet, Expression, ExprId, NamedExpression,
Predicate, ScalaUDF, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, AttributeSet, DynamicPruningSubquery, Expression, ExprId,
NamedExpression, Predicate, ScalaUDF, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
@@ -174,14 +174,16 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
partitionSet: AttributeSet,
filterPredicates: Seq[Expression]): Seq[Expression] = {
filterPredicates
- .filterNot(SubqueryExpression.hasSubquery)
.filter { filter =>
filter.references.nonEmpty && filter.references.subsetOf(partitionSet)
}
}
def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression]):
Seq[Expression] = {
- filter
+ filter.filter {
+ case _: DynamicPruningSubquery => false
+ case _ => true
+ }
}
// As per SPARK-22520 OptimizeCodegen is removed in 2.3.1
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
index 8c9b7b1..50c70e5 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
@@ -268,7 +268,7 @@ class DBLocationCarbonTableTestCase extends QueryTest with
BeforeAndAfterEach {
}
test("Alter table drop column test") {
- sql(s"create database carbon location '$dbLocation'")
+ sql(s"create database carbon location '$dbLocation/newdb/'")
sql("use carbon")
sql(
"""create table carbon.carbontable (
diff --git
a/integration/spark/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningTestCase.scala
b/integration/spark/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningTestCase.scala
new file mode 100644
index 0000000..cc9ff36
--- /dev/null
+++
b/integration/spark/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningTestCase.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.util.SparkUtil
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+class DynamicPartitionPruningTestCase extends QueryTest with
BeforeAndAfterEach {
+
+ override protected def beforeEach(): Unit = {
+ sql("drop table if exists dpp_table1")
+ sql("drop table if exists dpp_table2")
+ }
+
+ override protected def afterEach(): Unit = {
+ sql("drop table if exists dpp_table1")
+ sql("drop table if exists dpp_table2")
+ }
+
+ test("test partition pruning: carbon join other format table") {
+ sql("create table dpp_table1(col1 int) partitioned by (col2 string) stored
as carbondata")
+ sql("insert into dpp_table1 values(1, 'a'),(2, 'b'),(3, 'c'),(4, 'd')")
+ sql("create table dpp_table2(col1 int, col2 string) ")
+ sql("insert into dpp_table2 values(1, 'a'),(2, 'b'),(3, 'c'),(4, 'd')")
+
+ // partition table without filter
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t2.col1 in (1,2)",
+ "dpp_table1",
+ 2,
+ 4)
+
+ // partition table with filter on partition column
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t2.col2 in ('b','c') and t2.col1 in (1,2)",
+ "dpp_table1",
+ 1,
+ 2)
+
+ // partition table with filter on normal column
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t1.col1 in (2, 3) and t2.col1 in (1,2)",
+ "dpp_table1",
+ 2,
+ 4)
+
+ // partition table with filter on normal column and partition column
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t1.col1 in (2, 3) and t1.col2 in ('b', 'c')
and t2.col1 in (1,2)",
+ "dpp_table1",
+ 1,
+ 2)
+ }
+
+ test("test partition pruning: carbon join format table") {
+ sql("create table dpp_table1(col1 int) partitioned by (col2 string) stored
as carbondata")
+ sql("insert into dpp_table1 values(1, 'a'),(2, 'b'),(3, 'c'),(4, 'd')")
+ sql("create table dpp_table2(col1 int, col2 string) stored as carbondata")
+ sql("insert into dpp_table2 values(1, 'a'),(2, 'b'),(3, 'c'),(4, 'd')")
+
+ // partition table without filter
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t2.col1 in (1,2)",
+ "dpp_table1",
+ 2,
+ 4)
+
+ // partition table with filter on partition column
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t2.col2 in ('b','c') and t2.col1 in (1,2)",
+ "dpp_table1",
+ 1,
+ 2)
+
+ // partition table with filter on normal column
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t1.col1 in (2, 3) and t2.col1 in (1,2)",
+ "dpp_table1",
+ 2,
+ 4)
+
+ // partition table with filter on normal column and partition column
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t1.col1 in (2, 3) and t1.col2 in ('b', 'c')
and t2.col1 in (1,2)",
+ "dpp_table1",
+ 1,
+ 2)
+ }
+
+ test("test partition pruning: partitioned table join partitioned table") {
+ sql("create table dpp_table1(col1 int) partitioned by (col2 string) stored
as carbondata")
+ sql("insert into dpp_table1 values(1, 'a'),(2, 'b'),(3, 'c'),(4, 'd')")
+ sql("create table dpp_table2(col1 int) partitioned by (col2 string) stored
as carbondata")
+ sql("insert into dpp_table2 values(1, 'a'),(2, 'b'),(3, 'c'),(4, 'd')")
+
+ // partition table without filter
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t2.col1 in (1,2)",
+ "dpp_table1",
+ 2,
+ 4)
+
+ // right table without filter
+ verifyPartitionPruning(
+ "select /** BROADCAST(t1) */ t1.col1 from dpp_table1 t1, dpp_table2 t2 "
+
+ "where t1.col2=t2.col2 and t1.col1 in (1,2)",
+ "dpp_table2",
+ 4,
+ 4)
+
+ // left table with filter on partition column
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t1.col2 in ('b', 'c') and t2.col1 in (1,2)",
+ "dpp_table1",
+ 1,
+ 2)
+
+ // right table with filter on partition column
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t2.col2 in ('b', 'c') and t1.col1 in (1,2)",
+ "dpp_table2",
+ 2,
+ 2,
+ false)
+
+ // left table with filter on normal column
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t1.col1 in (2, 3) and t2.col1 in (1,2)",
+ "dpp_table1",
+ 2,
+ 4)
+
+ // right table with filter on normal column
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t2.col1 in (2, 3) and t1.col1 in (1,2)",
+ "dpp_table2",
+ 4,
+ 4,
+ false)
+
+ // left table with filter on normal column and partition column
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t1.col1 in (2, 3) and t1.col2 in ('b', 'c')
and t2.col1 in (1,2)",
+ "dpp_table1",
+ 1,
+ 2)
+
+ // right table with filter on normal column and partition column
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t2.col1 in (2, 3) and t2.col2 in ('b', 'c')
and t1.col1 in (1,2)",
+ "dpp_table2",
+ 2,
+ 2,
+ false)
+
+ // both tables with filter on normal column and partition column
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t1.col1 in (2, 3) and t1.col2 in ('a', 'b',
'c') and " +
+ "t2.col1 = 2 and t2.col2 in ('a', 'b')",
+ "dpp_table1",
+ 1,
+ 2)
+
+ verifyPartitionPruning(
+ "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+ "where t1.col2=t2.col2 and t2.col1 in (2, 3) and t2.col2 in ('a', 'b',
'c') and " +
+ "t1.col1 = 2 and t2.col2 in ('a', 'b')",
+ "dpp_table2",
+ 2,
+ 2,
+ false)
+ }
+
+ private def verifyPartitionPruning(sqlText: String,
+ tableName: String,
+ dynamicPartitionPruning: Int,
+ staticPartitionPruning: Int,
+ hasDynamicPruning: Boolean = true): Unit = {
+ if (SPARK_VERSION.startsWith("3")) {
+ val df = sql(sqlText)
+ df.collect()
+ val ds = df.queryExecution.executedPlan.find { plan =>
+ plan.isInstanceOf[CarbonDataSourceScan] &&
+
plan.asInstanceOf[CarbonDataSourceScan].tableIdentifier.get.table.equals(tableName)
+ }
+ val carbonDs = ds.get.asInstanceOf[CarbonDataSourceScan]
+ val carbonRdd =
carbonDs.inputRDDs().head.asInstanceOf[CarbonScanRDD[InternalRow]]
+ assert(carbonDs.metadata.contains("PartitionFilters"))
+ if (SparkUtil.isSparkVersionXAndAbove("2.4")) {
+ if (hasDynamicPruning) {
+
assert(carbonDs.metadata("PartitionFilters").contains("dynamicpruning"))
+ }
+ assertResult(dynamicPartitionPruning)(carbonRdd.partitionNames.size)
+ } else {
+ assertResult(staticPartitionPruning)(carbonRdd.partitionNames.size)
+ }
+ }
+ }
+}