This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e77b1ea4 [spark] Use Seq in the PaimonScan's constructor to ensure 
equals (#3185)
8e77b1ea4 is described below

commit 8e77b1ea426fd7268ecbb872e9d7c17f67d438e6
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Apr 10 10:10:04 2024 +0800

    [spark] Use Seq in the PaimonScan's constructor to ensure equals (#3185)
---
 .../scala/org/apache/paimon/spark/PaimonScan.scala  |  4 ++--
 .../org/apache/paimon/spark/PaimonBaseScan.scala    |  4 ++--
 .../scala/org/apache/paimon/spark/PaimonScan.scala  |  4 ++--
 .../MergePaimonScalarSubqueriersBase.scala          |  2 +-
 .../spark/statistics/StatisticsHelperBase.scala     |  6 ++----
 .../spark/sql/PaimonOptimizationTestBase.scala      | 21 +++++++++++++++++++++
 6 files changed, 30 insertions(+), 11 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 2bc75ad8f..24dfb342a 100644
--- 
a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.types.StructType
 case class PaimonScan(
     table: Table,
     requiredSchema: StructType,
-    filters: Array[Predicate],
-    reservedFilters: Array[Filter],
+    filters: Seq[Predicate],
+    reservedFilters: Seq[Filter],
     pushDownLimit: Option[Int])
   extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, 
pushDownLimit)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 9ca952de5..a5ba88723 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -39,8 +39,8 @@ import scala.collection.JavaConverters._
 abstract class PaimonBaseScan(
     table: Table,
     requiredSchema: StructType,
-    filters: Array[Predicate],
-    reservedFilters: Array[Filter],
+    filters: Seq[Predicate],
+    reservedFilters: Seq[Filter],
     pushDownLimit: Option[Int])
   extends Scan
   with SupportsReportStatistics
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 7f1900e2d..eb6bb10ee 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -32,8 +32,8 @@ import scala.collection.JavaConverters._
 case class PaimonScan(
     table: Table,
     requiredSchema: StructType,
-    filters: Array[Predicate],
-    reservedFilters: Array[Filter],
+    filters: Seq[Predicate],
+    reservedFilters: Seq[Filter],
     pushDownLimit: Option[Int])
   extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, 
pushDownLimit)
   with SupportsRuntimeFiltering {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
index 69ac3e3d8..45a086d09 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
@@ -267,7 +267,7 @@ trait MergePaimonScalarSubqueriersBase extends 
Rule[LogicalPlan] with PredicateH
   protected def mergePaimonScan(scan1: PaimonScan, scan2: PaimonScan): 
Option[PaimonScan] = {
     if (
       scan1.table == scan2.table &&
-      scan1.filters.sameElements(scan2.filters) &&
+      scan1.filters == scan2.filters &&
       scan1.pushDownLimit == scan2.pushDownLimit
     ) {
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala
index 275f17fe8..17eadf4f2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala
@@ -38,7 +38,7 @@ trait StatisticsHelperBase extends SQLConfHelper {
 
   val requiredSchema: StructType
 
-  def filterStatistics(v2Stats: Statistics, filters: Array[Filter]): 
Statistics = {
+  def filterStatistics(v2Stats: Statistics, filters: Seq[Filter]): Statistics 
= {
     val attrs: Seq[AttributeReference] =
       requiredSchema.map(f => AttributeReference(f.name, f.dataType, 
f.nullable, f.metadata)())
     val condition = filterToCondition(filters, attrs)
@@ -52,9 +52,7 @@ trait StatisticsHelperBase extends SQLConfHelper {
     }
   }
 
-  private def filterToCondition(
-      filters: Array[Filter],
-      attrs: Seq[Attribute]): Option[Expression] = {
+  private def filterToCondition(filters: Seq[Filter], attrs: Seq[Attribute]): 
Option[Expression] = {
     StructFilters.filterToExpression(filters.reduce(And), toRef).map {
       expression =>
         expression.transform {
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index 89bf3938e..1f4370045 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -24,7 +24,9 @@ import 
org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
CreateNamedStruct, Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, 
LogicalPlan, OneRowRelation, WithCTE}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import org.apache.spark.sql.functions._
+import org.junit.jupiter.api.Assertions
 
 import scala.collection.immutable
 
@@ -91,6 +93,25 @@ abstract class PaimonOptimizationTestBase extends 
PaimonSparkTestBase {
     }
   }
 
+  test("Paimon Optimization: paimon scan equals") {
+    withTable("T") {
+      spark.sql(s"CREATE TABLE T (id INT, name STRING, pt STRING) PARTITIONED 
BY (pt)")
+      spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3, 
'c', 'p2')")
+
+      val sqlText = "SELECT * FROM T WHERE id = 1 AND pt = 'p1' LIMIT 1"
+      def getPaimonScan(sqlText: String) = {
+        spark
+          .sql(sqlText)
+          .queryExecution
+          .optimizedPlan
+          .collectFirst { case relation: DataSourceV2ScanRelation => relation }
+          .get
+          .scan
+      }
+      Assertions.assertEquals(getPaimonScan(sqlText), getPaimonScan(sqlText))
+    }
+  }
+
   private def definitionNode(plan: LogicalPlan, cteIndex: Int) = {
     CTERelationDef(plan, cteIndex, underSubquery = true)
   }

Reply via email to