This is an automated email from the ASF dual-hosted git repository.

zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bef31298 [VL] Refactor data filter in scan transformer (#5812)
3bef31298 is described below

commit 3bef31298e53c6c23cdb7e0fc09f60d655f5a505
Author: 高阳阳 <[email protected]>
AuthorDate: Tue May 21 17:01:54 2024 +0800

    [VL] Refactor data filter in scan transformer (#5812)
    
    [VL] Refactor data filter in scan transformer.
---
 .../src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala      | 2 ++
 .../scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala     | 2 ++
 .../scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala     | 2 ++
 .../scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala     | 4 ++++
 .../org/apache/spark/sql/execution/FileSourceScanExecShim.scala      | 5 ++++-
 .../scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala     | 5 +++++
 .../org/apache/spark/sql/execution/FileSourceScanExecShim.scala      | 4 ++--
 7 files changed, 21 insertions(+), 3 deletions(-)

diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index fd8cd24c3..d6acc8c27 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -193,6 +193,8 @@ trait SparkShims {
 
   def getFileStatus(partition: PartitionDirectory): Seq[FileStatus]
 
+  def isRowIndexMetadataColumn(name: String): Boolean
+
   def splitFiles(
       sparkSession: SparkSession,
       file: FileStatus,
diff --git 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 97251a7ef..29fddc697 100644
--- 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -189,6 +189,8 @@ class Spark32Shims extends SparkShims {
 
   def getFileStatus(partition: PartitionDirectory): Seq[FileStatus] = 
partition.files
 
+  def isRowIndexMetadataColumn(name: String): Boolean = false
+
   def splitFiles(
       sparkSession: SparkSession,
       file: FileStatus,
diff --git 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 58dc0a00c..7c6ce644d 100644
--- 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -273,6 +273,8 @@ class Spark33Shims extends SparkShims {
 
   def getFileStatus(partition: PartitionDirectory): Seq[FileStatus] = 
partition.files
 
+  def isRowIndexMetadataColumn(name: String): Boolean = false
+
   def splitFiles(
       sparkSession: SparkSession,
       file: FileStatus,
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 6d70d67f3..4ab307e85 100644
--- 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -327,6 +327,10 @@ class Spark34Shims extends SparkShims {
 
   def getFileStatus(partition: PartitionDirectory): Seq[FileStatus] = 
partition.files
 
+  def isRowIndexMetadataColumn(name: String): Boolean = {
+    name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+  }
+
   def splitFiles(
       sparkSession: SparkSession,
       file: FileStatus,
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 15455d51c..33df953f3 100644
--- 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.metrics.GlutenTimeMetric
+import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
@@ -56,7 +57,9 @@ abstract class FileSourceScanExecShim(
     case FileSourceGeneratedMetadataAttribute(attr) => attr
   }
 
-  def dataFiltersInScan: Seq[Expression] = dataFilters
+  def dataFiltersInScan: Seq[Expression] = 
dataFilters.filterNot(_.references.exists {
+    attr => SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(attr.name)
+  })
 
   def hasUnsupportedColumns: Boolean = {
     val metadataColumnsNames = metadataColumns.map(_.name)
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index 00f9d62fd..ef1cea865 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -42,6 +42,7 @@ import org.apache.spark.sql.connector.read.{HasPartitionKey, 
InputPartition, Sca
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
 import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
@@ -355,6 +356,10 @@ class Spark35Shims extends SparkShims {
   def getFileStatus(partition: PartitionDirectory): Seq[FileStatus] =
     partition.files.map(_.fileStatus)
 
+  def isRowIndexMetadataColumn(name: String): Boolean = {
+    name == ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+  }
+
   def splitFiles(
       sparkSession: SparkSession,
       file: FileStatus,
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 6295bcbc4..dccf1bbce 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.metrics.GlutenTimeMetric
+import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
@@ -59,8 +60,7 @@ abstract class FileSourceScanExecShim(
   protected lazy val driverMetricsAlias = driverMetrics
 
   def dataFiltersInScan: Seq[Expression] = 
dataFilters.filterNot(_.references.exists {
-    case FileSourceMetadataAttribute(_) => true
-    case _ => false
+    attr => SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(attr.name)
   })
 
   def hasUnsupportedColumns: Boolean = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to