This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8e77b1ea4 [spark] Use Seq in the PaimonScan's constructor to ensure
equals (#3185)
8e77b1ea4 is described below
commit 8e77b1ea426fd7268ecbb872e9d7c17f67d438e6
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Apr 10 10:10:04 2024 +0800
[spark] Use Seq in the PaimonScan's constructor to ensure equals (#3185)
---
.../scala/org/apache/paimon/spark/PaimonScan.scala | 4 ++--
.../org/apache/paimon/spark/PaimonBaseScan.scala | 4 ++--
.../scala/org/apache/paimon/spark/PaimonScan.scala | 4 ++--
.../MergePaimonScalarSubqueriersBase.scala | 2 +-
.../spark/statistics/StatisticsHelperBase.scala | 6 ++----
.../spark/sql/PaimonOptimizationTestBase.scala | 21 +++++++++++++++++++++
6 files changed, 30 insertions(+), 11 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 2bc75ad8f..24dfb342a 100644
---
a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.types.StructType
case class PaimonScan(
table: Table,
requiredSchema: StructType,
- filters: Array[Predicate],
- reservedFilters: Array[Filter],
+ filters: Seq[Predicate],
+ reservedFilters: Seq[Filter],
pushDownLimit: Option[Int])
extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters,
pushDownLimit)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 9ca952de5..a5ba88723 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -39,8 +39,8 @@ import scala.collection.JavaConverters._
abstract class PaimonBaseScan(
table: Table,
requiredSchema: StructType,
- filters: Array[Predicate],
- reservedFilters: Array[Filter],
+ filters: Seq[Predicate],
+ reservedFilters: Seq[Filter],
pushDownLimit: Option[Int])
extends Scan
with SupportsReportStatistics
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 7f1900e2d..eb6bb10ee 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -32,8 +32,8 @@ import scala.collection.JavaConverters._
case class PaimonScan(
table: Table,
requiredSchema: StructType,
- filters: Array[Predicate],
- reservedFilters: Array[Filter],
+ filters: Seq[Predicate],
+ reservedFilters: Seq[Filter],
pushDownLimit: Option[Int])
extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters,
pushDownLimit)
with SupportsRuntimeFiltering {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
index 69ac3e3d8..45a086d09 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
@@ -267,7 +267,7 @@ trait MergePaimonScalarSubqueriersBase extends
Rule[LogicalPlan] with PredicateH
protected def mergePaimonScan(scan1: PaimonScan, scan2: PaimonScan):
Option[PaimonScan] = {
if (
scan1.table == scan2.table &&
- scan1.filters.sameElements(scan2.filters) &&
+ scan1.filters == scan2.filters &&
scan1.pushDownLimit == scan2.pushDownLimit
) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala
index 275f17fe8..17eadf4f2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala
@@ -38,7 +38,7 @@ trait StatisticsHelperBase extends SQLConfHelper {
val requiredSchema: StructType
- def filterStatistics(v2Stats: Statistics, filters: Array[Filter]):
Statistics = {
+ def filterStatistics(v2Stats: Statistics, filters: Seq[Filter]): Statistics
= {
val attrs: Seq[AttributeReference] =
requiredSchema.map(f => AttributeReference(f.name, f.dataType,
f.nullable, f.metadata)())
val condition = filterToCondition(filters, attrs)
@@ -52,9 +52,7 @@ trait StatisticsHelperBase extends SQLConfHelper {
}
}
- private def filterToCondition(
- filters: Array[Filter],
- attrs: Seq[Attribute]): Option[Expression] = {
+ private def filterToCondition(filters: Seq[Filter], attrs: Seq[Attribute]):
Option[Expression] = {
StructFilters.filterToExpression(filters.reduce(And), toRef).map {
expression =>
expression.transform {
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index 89bf3938e..1f4370045 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -24,7 +24,9 @@ import
org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers
import org.apache.spark.sql.catalyst.expressions.{Attribute,
CreateNamedStruct, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef,
LogicalPlan, OneRowRelation, WithCTE}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.functions._
+import org.junit.jupiter.api.Assertions
import scala.collection.immutable
@@ -91,6 +93,25 @@ abstract class PaimonOptimizationTestBase extends
PaimonSparkTestBase {
}
}
+ test("Paimon Optimization: paimon scan equals") {
+ withTable("T") {
+ spark.sql(s"CREATE TABLE T (id INT, name STRING, pt STRING) PARTITIONED
BY (pt)")
+ spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3,
'c', 'p2')")
+
+ val sqlText = "SELECT * FROM T WHERE id = 1 AND pt = 'p1' LIMIT 1"
+ def getPaimonScan(sqlText: String) = {
+ spark
+ .sql(sqlText)
+ .queryExecution
+ .optimizedPlan
+ .collectFirst { case relation: DataSourceV2ScanRelation => relation }
+ .get
+ .scan
+ }
+ Assertions.assertEquals(getPaimonScan(sqlText), getPaimonScan(sqlText))
+ }
+ }
+
private def definitionNode(plan: LogicalPlan, cteIndex: Int) = {
CTERelationDef(plan, cteIndex, underSubquery = true)
}