This is an automated email from the ASF dual-hosted git repository.
zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 3bef31298 [VL] Refactor data filter in scan transformer (#5812)
3bef31298 is described below
commit 3bef31298e53c6c23cdb7e0fc09f60d655f5a505
Author: 高阳阳 <[email protected]>
AuthorDate: Tue May 21 17:01:54 2024 +0800
[VL] Refactor data filter in scan transformer (#5812)
[VL] Refactor data filter in scan transformer.
---
.../src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala | 2 ++
.../scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala | 2 ++
.../scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala | 2 ++
.../scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala | 4 ++++
.../org/apache/spark/sql/execution/FileSourceScanExecShim.scala | 5 ++++-
.../scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala | 5 +++++
.../org/apache/spark/sql/execution/FileSourceScanExecShim.scala | 4 ++--
7 files changed, 21 insertions(+), 3 deletions(-)
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index fd8cd24c3..d6acc8c27 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -193,6 +193,8 @@ trait SparkShims {
def getFileStatus(partition: PartitionDirectory): Seq[FileStatus]
+ def isRowIndexMetadataColumn(name: String): Boolean
+
def splitFiles(
sparkSession: SparkSession,
file: FileStatus,
diff --git
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 97251a7ef..29fddc697 100644
---
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -189,6 +189,8 @@ class Spark32Shims extends SparkShims {
def getFileStatus(partition: PartitionDirectory): Seq[FileStatus] =
partition.files
+ def isRowIndexMetadataColumn(name: String): Boolean = false
+
def splitFiles(
sparkSession: SparkSession,
file: FileStatus,
diff --git
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 58dc0a00c..7c6ce644d 100644
---
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -273,6 +273,8 @@ class Spark33Shims extends SparkShims {
def getFileStatus(partition: PartitionDirectory): Seq[FileStatus] =
partition.files
+ def isRowIndexMetadataColumn(name: String): Boolean = false
+
def splitFiles(
sparkSession: SparkSession,
file: FileStatus,
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 6d70d67f3..4ab307e85 100644
---
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -327,6 +327,10 @@ class Spark34Shims extends SparkShims {
def getFileStatus(partition: PartitionDirectory): Seq[FileStatus] =
partition.files
+ def isRowIndexMetadataColumn(name: String): Boolean = {
+ name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+ }
+
def splitFiles(
sparkSession: SparkSession,
file: FileStatus,
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 15455d51c..33df953f3 100644
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution
import org.apache.gluten.metrics.GlutenTimeMetric
+import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
@@ -56,7 +57,9 @@ abstract class FileSourceScanExecShim(
case FileSourceGeneratedMetadataAttribute(attr) => attr
}
- def dataFiltersInScan: Seq[Expression] = dataFilters
+ def dataFiltersInScan: Seq[Expression] =
dataFilters.filterNot(_.references.exists {
+ attr => SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(attr.name)
+ })
def hasUnsupportedColumns: Boolean = {
val metadataColumnsNames = metadataColumns.map(_.name)
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index 00f9d62fd..ef1cea865 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -42,6 +42,7 @@ import org.apache.spark.sql.connector.read.{HasPartitionKey,
InputPartition, Sca
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
@@ -355,6 +356,10 @@ class Spark35Shims extends SparkShims {
def getFileStatus(partition: PartitionDirectory): Seq[FileStatus] =
partition.files.map(_.fileStatus)
+ def isRowIndexMetadataColumn(name: String): Boolean = {
+ name == ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+ }
+
def splitFiles(
sparkSession: SparkSession,
file: FileStatus,
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 6295bcbc4..dccf1bbce 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution
import org.apache.gluten.metrics.GlutenTimeMetric
+import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
@@ -59,8 +60,7 @@ abstract class FileSourceScanExecShim(
protected lazy val driverMetricsAlias = driverMetrics
def dataFiltersInScan: Seq[Expression] =
dataFilters.filterNot(_.references.exists {
- case FileSourceMetadataAttribute(_) => true
- case _ => false
+ attr => SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(attr.name)
})
def hasUnsupportedColumns: Boolean = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]