This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new bdc9484  [CARBONDATA-4271] Support DPP for carbon
bdc9484 is described below

commit bdc9484ac8455e8f53e86367c0e5104364799068
Author: Indhumathi27 <[email protected]>
AuthorDate: Tue Jul 13 17:07:39 2021 +0530

    [CARBONDATA-4271] Support DPP for carbon
    
    Why is this PR needed?
    This PR enables Dynamic Partition Pruning for carbon.
    
    What changes were proposed in this PR?
    CarbonDatasourceHadoopRelation has to extend HadoopFsRelation,
    because spark has added a check to use DPP only for relation matching 
HadoopFsRelation
    Apply Dynamic filter and get runtimePartitions and set this to 
CarbonScanRDD for pruning
    
    This closes #4199
---
 .../spark/sql/CarbonDataSourceScanHelper.scala     |   3 +
 .../spark/rdd/CarbonDeltaRowScanRDD.scala          |   4 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala       |   2 +-
 .../spark/sql/CarbonDatasourceHadoopRelation.scala |  49 ++++-
 .../scala/org/apache/spark/sql/CarbonSource.scala  |  24 ++-
 .../CarbonInsertIntoHadoopFsRelationCommand.scala  |   2 -
 .../table/CarbonCreateTableAsSelectCommand.scala   |   2 +-
 .../execution/strategy/CarbonDataSourceScan.scala  |  26 ++-
 .../sql/execution/strategy/CarbonPlanHelper.scala  |  44 ++--
 .../execution/strategy/CarbonSourceStrategy.scala  |  44 ++--
 .../spark/sql/execution/strategy/DMLStrategy.scala |   5 +-
 .../spark/sql/hive/CarbonFileMetastore.scala       |   2 +-
 .../apache/spark/sql/optimizer/CarbonFilters.scala |   6 +-
 .../spark/sql/CarbonDataSourceScanHelper.scala     |  43 +++-
 .../apache/spark/sql/CarbonToSparkAdapter.scala    |   8 +-
 .../dblocation/DBLocationCarbonTableTestCase.scala |   2 +-
 .../sql/DynamicPartitionPruningTestCase.scala      | 235 +++++++++++++++++++++
 17 files changed, 426 insertions(+), 75 deletions(-)

diff --git 
a/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
 
b/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
index 32a2e34..22183de 100644
--- 
a/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
+++ 
b/integration/spark/src/main/common2.3and2.4/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import org.apache.spark.CarbonInputMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression => 
SparkExpression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.execution.{ColumnarBatchScan, DataSourceScanExec}
@@ -40,6 +41,8 @@ abstract class  CarbonDataSourceScanHelper(relation: 
CarbonDatasourceHadoopRelat
     pushedDownProjection: CarbonProjection,
     directScanSupport: Boolean,
     extraRDD: Option[(RDD[InternalRow], Boolean)],
+    selectedCatalogPartitions: Seq[CatalogTablePartition],
+    partitionFilterWithDpp: Seq[SparkExpression],
     segmentIds: Option[String])
   extends DataSourceScanExec with ColumnarBatchScan {
 
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
index de6aeb5..949216c 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeltaRowScanRDD.scala
@@ -44,7 +44,7 @@ class CarbonDeltaRowScanRDD[T: ClassTag](
     @transient private val spark: SparkSession,
     @transient private val serializedTableInfo: Array[Byte],
     @transient private val tableInfo: TableInfo,
-    @transient override val partitionNames: Seq[PartitionSpec],
+    @transient private val newPartitionNames: Seq[PartitionSpec],
     override val columnProjection: CarbonProjection,
     var filter: IndexFilter,
     identifier: AbsoluteTableIdentifier,
@@ -62,7 +62,7 @@ class CarbonDeltaRowScanRDD[T: ClassTag](
     serializedTableInfo,
     tableInfo,
     inputMetricsStats,
-    partitionNames,
+    newPartitionNames,
     dataTypeConverterClz,
     readSupportClz) {
   override def internalGetPartitions: Array[Partition] = {
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index fcb379c..90f1f8b 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -83,7 +83,7 @@ class CarbonScanRDD[T: ClassTag](
     @transient private val serializedTableInfo: Array[Byte],
     @transient private val tableInfo: TableInfo,
     inputMetricsStats: InitInputMetrics,
-    @transient val partitionNames: Seq[PartitionSpec],
+    @transient var partitionNames: Seq[PartitionSpec],
     val dataTypeConverterClz: Class[_ <: DataTypeConverter] = 
classOf[SparkDataTypeConverterImpl],
     val readSupportClz: Class[_ <: CarbonReadSupport[_]] = 
SparkReadSupport.readSupportClass,
     @transient var splits: java.util.List[InputSplit] = null,
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 0d60144..73f8f1c 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -17,21 +17,28 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
SparkCarbonTableFormat}
 import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 
-case class CarbonDatasourceHadoopRelation(
-    sparkSession: SparkSession,
-    paths: Array[String],
-    parameters: Map[String, String],
-    tableSchema: Option[StructType],
-    limit: Int = -1)
-  extends BaseRelation {
+class CarbonDatasourceHadoopRelation(
+    override val sparkSession: SparkSession,
+    val paths: Array[String],
+    val parameters: Map[String, String],
+    val tableSchema: Option[StructType],
+    partitionSchema: StructType = new StructType())
+  extends HadoopFsRelation(null,
+    partitionSchema,
+    new StructType(),
+    None,
+    new SparkCarbonTableFormat,
+    Map.empty)(
+    sparkSession) {
 
   val caseInsensitiveMap: Map[String, String] = parameters.map(f => 
(f._1.toLowerCase, f._2))
   lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
@@ -47,9 +54,11 @@ case class CarbonDatasourceHadoopRelation(
 
   @transient lazy val carbonTable: CarbonTable = carbonRelation.carbonTable
 
+  var limit: Int = -1
+
   override def sqlContext: SQLContext = sparkSession.sqlContext
 
-  override def schema: StructType = 
tableSchema.getOrElse(carbonRelation.schema)
+  override val schema: StructType = 
tableSchema.getOrElse(carbonRelation.schema)
 
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new 
Array[Filter](0)
 
@@ -57,5 +66,27 @@ case class CarbonDatasourceHadoopRelation(
     "CarbonDatasourceHadoopRelation"
   }
 
+  override def equals(other: Any): Boolean = {
+    other match {
+      case relation: CarbonDatasourceHadoopRelation =>
+        relation.carbonRelation == carbonRelation && (relation.paths 
sameElements this.paths) &&
+        relation.tableSchema == tableSchema && relation.parameters == 
this.parameters &&
+        relation.partitionSchema == this.partitionSchema
+      case _ => false
+    }
+  }
+
   override def sizeInBytes: Long = carbonRelation.sizeInBytes
+
+  def getTableSchema: Option[StructType] = {
+    tableSchema
+  }
+
+  def getLimit: Int = {
+    limit
+  }
+
+  def setLimit(newLimit: Int): Unit = {
+    this.limit = newLimit
+  }
 }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 90c7339..864ba04 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -39,7 +39,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.CarbonOption
@@ -63,7 +63,7 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
     // Otherwise create datasource relation
     val newParameters = CarbonScalaUtil.getDeserializedParameters(parameters)
     newParameters.get("tablePath") match {
-      case Some(path) => 
CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
+      case Some(path) => new 
CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
         Array(path),
         newParameters,
         None)
@@ -71,7 +71,7 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
         val options = new CarbonOption(newParameters)
         val tablePath =
           CarbonEnv.getTablePath(options.dbName, 
options.tableName)(sqlContext.sparkSession)
-        CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
+        new CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
           Array(tablePath),
           newParameters,
           None)
@@ -136,9 +136,21 @@ class CarbonSource extends CreatableRelationProvider with 
RelationProvider
         "Table creation failed. Table name cannot contain blank space")
     }
     val path = getPathForTable(sqlContext.sparkSession, dbName, tableName, 
newParameters)
-
-    CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), 
newParameters,
-      Option(dataSchema))
+    var carbonTable: CarbonTable = null
+    try {
+      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), 
tableName)(sqlContext.sparkSession)
+    } catch {
+      case _: Exception =>
+    }
+    val partitionSchema: StructType =
+      if (null != carbonTable && carbonTable.isHivePartitionTable) {
+        
StructType(carbonTable.getPartitionInfo.getColumnSchemaList.asScala.map(field =>
+          
dataSchema.fields.find(_.name.equalsIgnoreCase(field.getColumnName))).map(_.get))
+      } else {
+        new StructType()
+      }
+    new CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), 
newParameters,
+      Option(dataSchema), partitionSchema)
   }
 
   /**
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala
index eb19440..cfa19db 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala
@@ -198,8 +198,6 @@ case class CarbonInsertIntoHadoopFsRelationCommand(
 
       // refresh cached files in FileIndex
       fileIndex.foreach(_.refresh())
-      // refresh data cache if table is cached
-      sparkSession.catalog.refreshByPath(outputPath.toString)
 
       if (catalogTable.nonEmpty) {
         CommandUtils.updateTableStats(sparkSession, catalogTable.get)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
index c9748c3..2c6bbfb 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
@@ -81,7 +81,7 @@ case class CarbonCreateTableAsSelectCommand(
         tableName = carbonDataSourceHadoopRelation.carbonRelation.tableName,
         options = scala.collection.immutable
           .Map("fileheader" ->
-               
carbonDataSourceHadoopRelation.tableSchema.get.fields.map(_.name).mkString(",")),
+               
carbonDataSourceHadoopRelation.getTableSchema.get.fields.map(_.name).mkString(",")),
         isOverwriteTable = false,
         logicalPlan = query,
         tableInfo = tableInfo)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
index 6dc2df1..fd1336f 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, 
CarbonDataSourceScanHelper}
 import 
org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, SortOrder, UnsafeProjection}
 import org.apache.spark.sql.catalyst.expressions.{Expression => 
SparkExpression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -41,7 +42,7 @@ import org.apache.carbondata.hadoop.CarbonProjection
 case class CarbonDataSourceScan(
     @transient relation: CarbonDatasourceHadoopRelation,
     output: Seq[Attribute],
-    partitionFilters: Seq[SparkExpression],
+    partitionFiltersWithoutDpp: Seq[SparkExpression],
     dataFilters: Seq[SparkExpression],
     @transient readCommittedScope: ReadCommittedScope,
     @transient pushedDownProjection: CarbonProjection,
@@ -49,9 +50,19 @@ case class CarbonDataSourceScan(
     directScanSupport: Boolean,
     @transient extraRDD: Option[(RDD[InternalRow], Boolean)] = None,
     tableIdentifier: Option[TableIdentifier] = None,
+    @transient selectedCatalogPartitions : Seq[CatalogTablePartition] = 
Seq.empty,
+    @transient partitionFiltersWithDpp: Seq[SparkExpression],
     segmentIds: Option[String] = None)
-  extends CarbonDataSourceScanHelper(relation, output, partitionFilters, 
pushedDownFilters,
-    pushedDownProjection, directScanSupport, extraRDD, segmentIds) {
+  extends CarbonDataSourceScanHelper(relation,
+    output,
+    partitionFiltersWithoutDpp,
+    pushedDownFilters,
+    pushedDownProjection,
+    directScanSupport,
+    extraRDD,
+    selectedCatalogPartitions,
+    partitionFiltersWithDpp,
+    segmentIds) {
 
   override lazy val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
     val info: BucketingInfo = relation.carbonTable.getBucketingInfo
@@ -89,7 +100,7 @@ case class CarbonDataSourceScan(
         "DirectScan" -> (supportsBatchOrColumnar && 
directScanSupport).toString,
         "PushedFilters" -> seqToString(pushedDownFilters.map(_.getStatement)))
     if (relation.carbonTable.isHivePartitionTable) {
-      metadata + ("PartitionFilters" -> seqToString(partitionFilters)) +
+      metadata + ("PartitionFilters" -> seqToString(partitionFiltersWithDpp)) +
         ("PartitionCount" -> selectedPartitions.size.toString)
     } else {
       metadata
@@ -129,13 +140,16 @@ case class CarbonDataSourceScan(
     CarbonDataSourceScan(
       relation,
       outputAttibutesAfterNormalizingExpressionIds,
-      QueryPlan.normalizePredicates(partitionFilters, output),
+      QueryPlan.normalizePredicates(partitionFiltersWithoutDpp, output),
       QueryPlan.normalizePredicates(dataFilters, output),
       null,
       null,
       null,
       directScanSupport,
       extraRDD,
-      tableIdentifier)
+      tableIdentifier,
+      selectedCatalogPartitions,
+      QueryPlan.normalizePredicates(partitionFiltersWithDpp, output)
+    )
   }
 }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala
index 00a3398..8c7e825 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonPlanHelper.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, 
CustomDeterministi
 import 
org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, NamedExpression, Rand}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.{ExecutedCommandExec, 
RunnableCommand}
@@ -45,7 +45,7 @@ object CarbonPlanHelper {
       databaseNameOp = Some(insertInto.table.carbonRelation.databaseName),
       tableName = insertInto.table.carbonRelation.tableName,
       options = scala.collection.immutable
-        .Map("fileheader" -> 
insertInto.table.tableSchema.get.fields.map(_.name).mkString(",")),
+        .Map("fileheader" -> 
insertInto.table.getTableSchema.get.fields.map(_.name).mkString(",")),
       isOverwriteTable = insertInto.overwrite,
       logicalPlan = insertInto.child,
       tableInfo = insertInto.table.carbonRelation.carbonTable.getTableInfo,
@@ -202,12 +202,20 @@ object CarbonPlanHelper {
           p.transformAllExpressions {
             case a@Alias(exp, _)
               if !exp.deterministic && 
!exp.isInstanceOf[CustomDeterministicExpression] =>
-              CarbonToSparkAdapter.createAliasRef(
-                CustomDeterministicExpression(exp),
-                a.name,
-                a.exprId,
-                a.qualifier,
-                a.explicitMetadata)
+              if (SparkUtil.isSparkVersionXAndAbove("3")) {
+                // create custom deterministic expression for Rand function
+                a.transform {
+                  case rand: Rand =>
+                    CustomDeterministicExpression(rand)
+                }
+              } else {
+                CarbonToSparkAdapter.createAliasRef(
+                  CustomDeterministicExpression(exp),
+                  a.name,
+                  a.exprId,
+                  a.qualifier,
+                  a.explicitMetadata)
+              }
             case exp: NamedExpression
               if !exp.deterministic && 
!exp.isInstanceOf[CustomDeterministicExpression] =>
               makeDeterministicExp(exp)
@@ -220,12 +228,20 @@ object CarbonPlanHelper {
           f.transformAllExpressions {
             case a@Alias(exp, _)
               if !exp.deterministic && 
!exp.isInstanceOf[CustomDeterministicExpression] =>
-              CarbonToSparkAdapter.createAliasRef(
-                CustomDeterministicExpression(exp),
-                a.name,
-                a.exprId,
-                a.qualifier,
-                a.explicitMetadata)
+              if (SparkUtil.isSparkVersionXAndAbove("3")) {
+                // create custom deterministic expression for Rand function
+                a.transform {
+                  case rand: Rand =>
+                    CustomDeterministicExpression(rand)
+                }
+              } else {
+                CarbonToSparkAdapter.createAliasRef(
+                  CustomDeterministicExpression(exp),
+                  a.name,
+                  a.exprId,
+                  a.qualifier,
+                  a.explicitMetadata)
+              }
             case exp: NamedExpression
               if !exp.deterministic && 
!exp.isInstanceOf[CustomDeterministicExpression] =>
               makeDeterministicExp(exp)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
index cdd8183..ed0da08 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
@@ -23,6 +23,7 @@ import org.apache.log4j.Logger
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{expressions, InternalRow, 
TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, _}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
@@ -35,6 +36,7 @@ import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.scan.expression.{Expression => CarbonFilter}
 import org.apache.carbondata.core.util.CarbonProperties
@@ -120,18 +122,17 @@ private[sql] object CarbonSourceStrategy extends 
SparkStrategy {
   }
 
   private def getPartitionFilter(relation: LogicalRelation,
-      filterPredicates: Seq[Expression]): Seq[Expression] = {
-    val names = 
relation.catalogTable.map(_.partitionColumnNames).getOrElse(Seq.empty)
+      filterPredicates: Seq[Expression], names: Seq[String]): 
(Seq[Expression], AttributeSet) = {
     // Get the current partitions from table.
     if (names.nonEmpty) {
       val partitionSet = AttributeSet(names
         .map(p => relation.output.find(_.name.equalsIgnoreCase(p)).get))
       // Update the name with lower case as it is case sensitive while getting 
partition info.
-      CarbonToSparkAdapter
+      (CarbonToSparkAdapter
         .getPartitionFilter(partitionSet, filterPredicates)
-        .map(CarbonToSparkAdapter.lowerCaseAttribute)
+        .map(CarbonToSparkAdapter.lowerCaseAttribute), partitionSet)
     } else {
-      Seq.empty
+      (Seq.empty, AttributeSet.empty)
     }
   }
 
@@ -143,7 +144,22 @@ private[sql] object CarbonSourceStrategy extends 
SparkStrategy {
       relation: LogicalRelation,
       rawProjects: Seq[NamedExpression],
       allPredicates: Seq[Expression]): SparkPlan = {
-    val partitionsFilter = getPartitionFilter(relation, allPredicates)
+    // get partition column names
+    val names = 
relation.catalogTable.map(_.partitionColumnNames).getOrElse(Seq.empty)
+    // get partition set from filter expression
+    val (partitionsFilter, partitionSet) = getPartitionFilter(relation, 
allPredicates, names)
+    var partitions : (Seq[CatalogTablePartition], Seq[PartitionSpec], 
Seq[Expression]) =
+      (null, null, Seq.empty)
+    var filterPredicates = allPredicates
+    if(names.nonEmpty) {
+      partitions = CarbonFilters.getCatalogTablePartitions(
+        partitionsFilter.filterNot(e => 
e.find(_.isInstanceOf[PlanExpression[_]]).isDefined),
+        SparkSession.getActiveSession.get,
+        relation.catalogTable.get.identifier
+      )
+      // remove dynamic partition filter from predicates
+      filterPredicates = CarbonToSparkAdapter.getDataFilter(partitionSet, 
allPredicates)
+    }
     val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
     val projects = rawProjects.map {p =>
       p.transform {
@@ -154,9 +170,9 @@ private[sql] object CarbonSourceStrategy extends 
SparkStrategy {
     // contains the original order of the projection requested
     val projectsAttr = projects.flatMap(_.references)
     val projectSet = AttributeSet(projectsAttr)
-    val filterSet = AttributeSet(allPredicates.flatMap(_.references))
+    val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
 
-    val relationPredicates = allPredicates.map {
+    val relationPredicates = filterPredicates.map {
       _ transform {
         case a: AttributeReference => relation.attributeMap(a) // Match 
original case of attributes.
       }
@@ -168,7 +184,7 @@ private[sql] object CarbonSourceStrategy extends 
SparkStrategy {
     // A set of column attributes that are only referenced by pushed down 
filters.  We can eliminate
     // them from requested columns.
     val handledSet = {
-      val handledPredicates = 
allPredicates.filterNot(unhandledPredicates.contains)
+      val handledPredicates = 
filterPredicates.filterNot(unhandledPredicates.contains)
       val unhandledSet = 
AttributeSet(unhandledPredicates.flatMap(_.references))
       try {
         AttributeSet(handledPredicates.flatMap(_.references)) --
@@ -184,7 +200,7 @@ private[sql] object CarbonSourceStrategy extends 
SparkStrategy {
     val readCommittedScope =
       new TableStatusReadCommittedScope(table.identifier, 
FileFactory.getConfiguration)
     val extraSegments = MixedFormatHandler.extraSegments(table.identifier, 
readCommittedScope)
-    val extraRDD = MixedFormatHandler.extraRDD(relation, rawProjects, 
allPredicates,
+    val extraRDD = MixedFormatHandler.extraRDD(relation, rawProjects, 
filterPredicates,
       readCommittedScope, table.identifier, extraSegments)
     val vectorPushRowFilters =
       vectorPushRowFiltersEnabled(relationPredicates, extraSegments.nonEmpty)
@@ -213,7 +229,7 @@ private[sql] object CarbonSourceStrategy extends 
SparkStrategy {
     val scan = CarbonDataSourceScan(
       table,
       output,
-      partitionsFilter,
+      partitionsFilter.filterNot(SubqueryExpression.hasSubquery),
       handledPredicates,
       readCommittedScope,
       getCarbonProjection(relationPredicates, requiredColumns, projects),
@@ -222,14 +238,16 @@ private[sql] object CarbonSourceStrategy extends 
SparkStrategy {
       extraRDD,
       Some(TableIdentifier(table.identifier.getTableName,
         Option(table.identifier.getDatabaseName))),
+      partitions._1,
+      partitionsFilter,
       segmentIds
     )
     // filter
     val filterOption = if (directScanSupport && CarbonToSparkAdapter
         .supportsBatchOrColumnar(scan)) {
-      allPredicates.reduceLeftOption(expressions.And)
+      filterPredicates.reduceLeftOption(expressions.And)
     } else if (extraSegments.nonEmpty) {
-      allPredicates.reduceLeftOption(expressions.And)
+      filterPredicates.reduceLeftOption(expressions.And)
     } else {
       filterCondition
     }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
index 8dbfc5a..9236e1e 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
@@ -374,9 +374,10 @@ object DMLStrategy extends SparkStrategy {
     def pushLimit(limit: Int, plan: LogicalPlan): LogicalPlan = {
       val newPlan = plan transform {
         case lr: LogicalRelation =>
-          val newRelation = lr.copy(relation = lr.relation
+          val relation = lr.relation
             .asInstanceOf[CarbonDatasourceHadoopRelation]
-            .copy(limit = limit))
+          relation.setLimit(limit)
+          val newRelation = lr.copy(relation = relation)
           newRelation
         case other => other
       }
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index dc32eac..4186405 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -559,7 +559,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
           case None =>
             CarbonEnv.getTablePath(tableIdentifier.database, 
tableIdentifier.table)(sparkSession)
         }
-        CarbonDatasourceHadoopRelation(sparkSession,
+        new CarbonDatasourceHadoopRelation(sparkSession,
           Array(tableLocation.asInstanceOf[String]),
           catalogTable.storage.properties,
           Option(catalogTable.schema))
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 09bdf18..2dd8c23 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -675,11 +675,11 @@ object CarbonFilters {
     if (!carbonTable.isHivePartitionTable) {
       return (null, null, partitionFilters)
     }
-    val partititions = getCatalogTablePartitions(partitionFilters, 
sparkSession, carbonTable)
-    if (partititions.isEmpty) {
+    val partitions = getCatalogTablePartitions(partitionFilters, sparkSession, 
carbonTable)
+    if (partitions.isEmpty) {
       (Seq.empty, Seq.empty, partitionFilters)
     } else {
-      (partititions, convertToPartitionSpec(partititions), partitionFilters)
+      (partitions, convertToPartitionSpec(partitions), partitionFilters)
     }
   }
 }
diff --git 
a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
 
b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
index 8fe14e6..20a8f5b 100644
--- 
a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
+++ 
b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonDataSourceScanHelper.scala
@@ -20,12 +20,14 @@ package org.apache.spark.sql
 import org.apache.spark.CarbonInputMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression => 
SparkExpression}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, 
ExternalCatalogUtils}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression => 
SparkExpression, PlanExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
 import org.apache.spark.sql.execution.{DataSourceScanExec, 
WholeStageCodegenExec}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.execution.strategy.CarbonPlanHelper
 import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 import org.apache.carbondata.core.index.IndexFilter
@@ -35,14 +37,16 @@ import 
org.apache.carbondata.core.scan.expression.logical.AndExpression
 import org.apache.carbondata.hadoop.CarbonProjection
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
-abstract class  CarbonDataSourceScanHelper(relation: 
CarbonDatasourceHadoopRelation,
-  output: Seq[Attribute],
-  partitionFilters: Seq[SparkExpression],
-  pushedDownFilters: Seq[Expression],
-  pushedDownProjection: CarbonProjection,
-  directScanSupport: Boolean,
-  extraRDD: Option[(RDD[InternalRow], Boolean)],
-  segmentIds: Option[String])
+abstract class CarbonDataSourceScanHelper(relation: 
CarbonDatasourceHadoopRelation,
+    output: Seq[Attribute],
+    partitionFiltersWithoutDpp: Seq[SparkExpression],
+    pushedDownFilters: Seq[Expression],
+    pushedDownProjection: CarbonProjection,
+    directScanSupport: Boolean,
+    extraRDD: Option[(RDD[InternalRow], Boolean)],
+    selectedCatalogPartitions: Seq[CatalogTablePartition],
+    partitionFiltersWithDpp: Seq[SparkExpression],
+    segmentIds: Option[String])
   extends DataSourceScanExec {
 
   override lazy val supportsColumnar: Boolean = CarbonPlanHelper
@@ -63,7 +67,7 @@ abstract class  CarbonDataSourceScanHelper(relation: 
CarbonDatasourceHadoopRelat
       .map(new IndexFilter(relation.carbonTable, _, true)).orNull
     if (filter != null && pushedDownFilters.length == 1) {
       // push down the limit if only one filter
-      filter.setLimit(relation.limit)
+      filter.setLimit(relation.getLimit)
     }
     filter
   }
@@ -89,11 +93,13 @@ abstract class  CarbonDataSourceScanHelper(relation: 
CarbonDatasourceHadoopRelat
 
   @transient lazy val selectedPartitions: Seq[PartitionSpec] = {
     CarbonFilters
-      .getPartitions(partitionFilters, relation.sparkSession, 
relation.carbonTable)
+      .getPartitions(partitionFiltersWithoutDpp, relation.sparkSession, 
relation.carbonTable)
       .orNull
   }
 
   lazy val inputRDD: RDD[InternalRow] = {
+    val dynamicFilter = partitionFiltersWithDpp.filter(exp =>
+      exp.find(_.isInstanceOf[PlanExpression[_]]).isDefined)
     val carbonRdd = new CarbonScanRDD[InternalRow](
       relation.sparkSession,
       pushedDownProjection,
@@ -104,6 +110,21 @@ abstract class  CarbonDataSourceScanHelper(relation: 
CarbonDatasourceHadoopRelat
       new CarbonInputMetrics,
       selectedPartitions,
       segmentIds = segmentIds)
+    if(dynamicFilter.nonEmpty) {
+      carbonRdd match {
+        case carbonRdd: CarbonScanRDD[InternalRow] =>
+          // prune dynamic partitions based on filter
+          val sparkExpression = SparkSQLUtil.getSparkSession
+          val runtimePartitions = ExternalCatalogUtils.prunePartitionsByFilter(
+            
sparkExpression.sessionState.catalog.getTableMetadata(tableIdentifier.get),
+            selectedCatalogPartitions,
+            dynamicFilter,
+            sparkExpression.sessionState.conf.sessionLocalTimeZone
+          )
+          // set partitions to carbon rdd
+          carbonRdd.partitionNames = 
CarbonFilters.convertToPartitionSpec(runtimePartitions)
+      }
+    }
     carbonRdd.setVectorReaderSupport(supportsColumnar)
     carbonRdd.setDirectScanSupport(supportsColumnar && directScanSupport)
     extraRDD.map(_._1.union(carbonRdd)).getOrElse(carbonRdd)
diff --git 
a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
 
b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
index 2d60667..25a27ad 100644
--- 
a/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ 
b/integration/spark/src/main/spark3.1/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -26,7 +26,7 @@ import org.apache.spark.scheduler.{SparkListener, 
SparkListenerApplicationEnd}
 import 
org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
ExternalCatalogWithListener}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, AttributeSet, Expression, ExprId, NamedExpression, 
Predicate, ScalaUDF, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, AttributeSet, DynamicPruningSubquery, Expression, ExprId, 
NamedExpression, Predicate, ScalaUDF, SubqueryExpression}
 import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
@@ -174,14 +174,16 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
       partitionSet: AttributeSet,
       filterPredicates: Seq[Expression]): Seq[Expression] = {
     filterPredicates
-      .filterNot(SubqueryExpression.hasSubquery)
       .filter { filter =>
         filter.references.nonEmpty && filter.references.subsetOf(partitionSet)
       }
   }
 
   def getDataFilter(partitionSet: AttributeSet, filter: Seq[Expression]): 
Seq[Expression] = {
-    filter
+    filter.filter {
+      case _: DynamicPruningSubquery => false
+      case _ => true
+    }
   }
 
   // As per SPARK-22520 OptimizeCodegen is removed in 2.3.1
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
index 8c9b7b1..50c70e5 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dblocation/DBLocationCarbonTableTestCase.scala
@@ -268,7 +268,7 @@ class DBLocationCarbonTableTestCase extends QueryTest with 
BeforeAndAfterEach {
   }
 
   test("Alter table drop column test") {
-    sql(s"create database carbon location '$dbLocation'")
+    sql(s"create database carbon location '$dbLocation/newdb/'")
     sql("use carbon")
     sql(
       """create table carbon.carbontable (
diff --git 
a/integration/spark/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningTestCase.scala
new file mode 100644
index 0000000..cc9ff36
--- /dev/null
+++ 
b/integration/spark/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningTestCase.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.util.SparkUtil
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+class DynamicPartitionPruningTestCase extends QueryTest with 
BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+    sql("drop table if exists dpp_table1")
+    sql("drop table if exists dpp_table2")
+  }
+
+  override protected def afterEach(): Unit = {
+    sql("drop table if exists dpp_table1")
+    sql("drop table if exists dpp_table2")
+  }
+
+  test("test partition pruning: carbon join other format table") {
+    sql("create table dpp_table1(col1 int) partitioned by (col2 string) stored 
as carbondata")
+    sql("insert into dpp_table1 values(1, 'a'),(2, 'b'),(3, 'c'),(4, 'd')")
+    sql("create table dpp_table2(col1 int, col2 string) ")
+    sql("insert into dpp_table2 values(1, 'a'),(2, 'b'),(3, 'c'),(4, 'd')")
+
+    // partition table without filter
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and t2.col1 in (1,2)",
+      "dpp_table1",
+      2,
+      4)
+
+    // partition table with filter on partition column
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and t2.col2 in ('b','c') and t2.col1 in (1,2)",
+      "dpp_table1",
+      1,
+      2)
+
+    // partition table with filter on normal column
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and t1.col1 in (2, 3) and t2.col1 in (1,2)",
+      "dpp_table1",
+      2,
+      4)
+
+    // partition table with filter on normal column and partition column
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and t1.col1 in (2, 3) and t1.col2 in ('b', 'c') 
and t2.col1 in (1,2)",
+      "dpp_table1",
+      1,
+      2)
+  }
+
+  test("test partition pruning: carbon join format table") {
+    sql("create table dpp_table1(col1 int) partitioned by (col2 string) stored 
as carbondata")
+    sql("insert into dpp_table1 values(1, 'a'),(2, 'b'),(3, 'c'),(4, 'd')")
+    sql("create table dpp_table2(col1 int, col2 string) stored as carbondata")
+    sql("insert into dpp_table2 values(1, 'a'),(2, 'b'),(3, 'c'),(4, 'd')")
+
+    // partition table without filter
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and t2.col1 in (1,2)",
+      "dpp_table1",
+      2,
+      4)
+
+    // partition table with filter on partition column
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and t2.col2 in ('b','c') and t2.col1 in (1,2)",
+      "dpp_table1",
+      1,
+      2)
+
+    // partition table with filter on normal column
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and t1.col1 in (2, 3) and t2.col1 in (1,2)",
+      "dpp_table1",
+      2,
+      4)
+
+    // partition table with filter on normal column and partition column
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and t1.col1 in (2, 3) and t1.col2 in ('b', 'c') 
and t2.col1 in (1,2)",
+      "dpp_table1",
+      1,
+      2)
+  }
+
+  test("test partition pruning: partitioned table join partitioned table") {
+    sql("create table dpp_table1(col1 int) partitioned by (col2 string) stored 
as carbondata")
+    sql("insert into dpp_table1 values(1, 'a'),(2, 'b'),(3, 'c'),(4, 'd')")
+    sql("create table dpp_table2(col1 int) partitioned by (col2 string) stored 
as carbondata")
+    sql("insert into dpp_table2 values(1, 'a'),(2, 'b'),(3, 'c'),(4, 'd')")
+
+    // partition table without filter
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and t2.col1 in (1,2)",
+      "dpp_table1",
+      2,
+      4)
+
+    // right table without filter
+    verifyPartitionPruning(
+      "select /** BROADCAST(t1) */ t1.col1 from dpp_table1 t1, dpp_table2 t2 " 
+
+      "where t1.col2=t2.col2 and t1.col1 in (1,2)",
+      "dpp_table2",
+      4,
+      4)
+
+    // left table with filter on partition column
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and t1.col2 in ('b', 'c') and t2.col1 in (1,2)",
+      "dpp_table1",
+      1,
+      2)
+
+    // right table with filter on partition column
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and t2.col2 in ('b', 'c') and t1.col1 in (1,2)",
+      "dpp_table2",
+      2,
+      2,
+      false)
+
+    // left table with filter on normal column
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and t1.col1 in (2, 3) and t2.col1 in (1,2)",
+      "dpp_table1",
+      2,
+      4)
+
+    // right table with filter on normal column
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and t2.col1 in (2, 3) and t1.col1 in (1,2)",
+      "dpp_table2",
+      4,
+      4,
+      false)
+
+    // left table with filter on normal column and partition column
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and t1.col1 in (2, 3) and t1.col2 in ('b', 'c') 
and t2.col1 in (1,2)",
+      "dpp_table1",
+      1,
+      2)
+
+    // right table with filter on normal column and partition column
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and t2.col1 in (2, 3) and t2.col2 in ('b', 'c') 
and t1.col1 in (1,2)",
+      "dpp_table2",
+      2,
+      2,
+      false)
+
+    // both tables with filter on normal column and partition column
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and  t1.col1 in (2, 3) and t1.col2 in ('a', 'b', 
'c') and " +
+      "t2.col1 = 2 and t2.col2 in ('a', 'b')",
+      "dpp_table1",
+      1,
+      2)
+
+    verifyPartitionPruning(
+      "select t1.col1 from dpp_table1 t1, dpp_table2 t2 " +
+      "where t1.col2=t2.col2 and  t2.col1 in (2, 3) and t2.col2 in ('a', 'b', 
'c') and " +
+      "t1.col1 = 2 and t2.col2 in ('a', 'b')",
+      "dpp_table2",
+      2,
+      2,
+      false)
+  }
+
+  private def verifyPartitionPruning(sqlText: String,
+      tableName: String,
+      dynamicPartitionPruning: Int,
+      staticPartitionPruning: Int,
+      hasDynamicPruning: Boolean = true): Unit = {
+    if (SPARK_VERSION.startsWith("3")) {
+      val df = sql(sqlText)
+      df.collect()
+      val ds = df.queryExecution.executedPlan.find { plan =>
+        plan.isInstanceOf[CarbonDataSourceScan] &&
+        
plan.asInstanceOf[CarbonDataSourceScan].tableIdentifier.get.table.equals(tableName)
+      }
+      val carbonDs = ds.get.asInstanceOf[CarbonDataSourceScan]
+      val carbonRdd = 
carbonDs.inputRDDs().head.asInstanceOf[CarbonScanRDD[InternalRow]]
+      assert(carbonDs.metadata.contains("PartitionFilters"))
+      if (SparkUtil.isSparkVersionXAndAbove("2.4")) {
+        if (hasDynamicPruning) {
+          
assert(carbonDs.metadata("PartitionFilters").contains("dynamicpruning"))
+        }
+        assertResult(dynamicPartitionPruning)(carbonRdd.partitionNames.size)
+      } else {
+        assertResult(staticPartitionPruning)(carbonRdd.partitionNames.size)
+      }
+    }
+  }
+}

Reply via email to