This is an automated email from the ASF dual-hosted git repository.

yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0bb41c742f [GLUTEN-7174][VL] Force fallback scan operator when 
spark.sql.parquet.mergeSchema enabled (#7634)
0bb41c742f is described below

commit 0bb41c742ff3adbe16263db9752dca8e64649d20
Author: Yang Zhang <[email protected]>
AuthorDate: Sun Nov 3 21:24:06 2024 +0800

    [GLUTEN-7174][VL] Force fallback scan operator when 
spark.sql.parquet.mergeSchema enabled (#7634)
---
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |   5 +-
 .../gluten/backendsapi/velox/VeloxBackend.scala    | 142 ++++++++++++---------
 .../gluten/backendsapi/BackendSettingsApi.scala    |   5 +-
 .../execution/BasicScanExecTransformer.scala       |   2 +-
 .../gluten/utils/velox/VeloxTestSettings.scala     |   3 -
 .../parquet/GlutenParquetFilterSuite.scala         |  49 -------
 .../gluten/utils/velox/VeloxTestSettings.scala     |   6 -
 .../parquet/GlutenParquetFilterSuite.scala         |  49 -------
 .../gluten/utils/velox/VeloxTestSettings.scala     |   6 -
 .../parquet/GlutenParquetFilterSuite.scala         |  49 -------
 .../gluten/utils/velox/VeloxTestSettings.scala     |   6 -
 .../parquet/GlutenParquetFilterSuite.scala         |  49 -------
 12 files changed, 86 insertions(+), 285 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 59d912d8e7..9a1b00f714 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -142,10 +142,11 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
       .toLowerCase(Locale.getDefault)
   }
 
-  override def validateScan(
+  override def validateScanExec(
       format: ReadFileFormat,
       fields: Array[StructField],
-      rootPaths: Seq[String]): ValidationResult = {
+      rootPaths: Seq[String],
+      properties: Map[String, String]): ValidationResult = {
 
     // Validate if all types are supported.
     def hasComplexType: Boolean = {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 251b93cc7c..539059cdb6 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -35,12 +35,12 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, 
Descending, Expression, Lag, Lead, NamedExpression, NthValue, NTile, 
PercentRank, RangeFrame, Rank, RowNumber, SortOrder, SpecialFrameBoundary, 
SpecifiedWindowFrame}
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
ApproximatePercentile, Percentile}
 import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
-import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CharVarcharUtils}
 import org.apache.spark.sql.execution.{ColumnarCachedBatchSerializer, 
SparkPlan}
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import 
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
InsertIntoHadoopFsRelationCommand}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetOptions}
 import org.apache.spark.sql.hive.execution.HiveFileFormat
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -88,78 +88,94 @@ object VeloxBackendSettings extends BackendSettingsApi {
   val GLUTEN_VELOX_INTERNAL_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX + 
".internal.udfLibraryPaths"
   val GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION = VeloxBackend.CONF_PREFIX + 
".udfAllowTypeConversion"
 
-  val MAXIMUM_BATCH_SIZE: Int = 32768
-
-  override def validateScan(
+  override def validateScanExec(
       format: ReadFileFormat,
       fields: Array[StructField],
-      rootPaths: Seq[String]): ValidationResult = {
-    val filteredRootPaths = distinctRootPaths(rootPaths)
-    if (
-      filteredRootPaths.nonEmpty && !VeloxFileSystemValidationJniWrapper
-        .allSupportedByRegisteredFileSystems(filteredRootPaths.toArray)
-    ) {
-      return ValidationResult.failed(
-        s"Scheme of [$filteredRootPaths] is not supported by registered file 
systems.")
-    }
-    // Validate if all types are supported.
-    def validateTypes(validatorFunc: PartialFunction[StructField, String]): 
ValidationResult = {
-      // Collect unsupported types.
-      val unsupportedDataTypeReason = fields.collect(validatorFunc)
-      if (unsupportedDataTypeReason.isEmpty) {
-        ValidationResult.succeeded
+      rootPaths: Seq[String],
+      properties: Map[String, String]): ValidationResult = {
+
+    def validateScheme(): Option[String] = {
+      val filteredRootPaths = distinctRootPaths(rootPaths)
+      if (
+        filteredRootPaths.nonEmpty && !VeloxFileSystemValidationJniWrapper
+          .allSupportedByRegisteredFileSystems(filteredRootPaths.toArray)
+      ) {
+        Some(s"Scheme of [$filteredRootPaths] is not supported by registered 
file systems.")
       } else {
-        ValidationResult.failed(
-          s"Found unsupported data type in $format: 
${unsupportedDataTypeReason.mkString(", ")}.")
+        None
       }
     }
 
-    format match {
-      case ParquetReadFormat =>
-        val typeValidator: PartialFunction[StructField, String] = {
-          // Parquet timestamp is not fully supported yet
-          case StructField(_, TimestampType, _, _)
-              if 
GlutenConfig.getConf.forceParquetTimestampTypeScanFallbackEnabled =>
-            "TimestampType"
-        }
-        validateTypes(typeValidator)
-      case DwrfReadFormat => ValidationResult.succeeded
-      case OrcReadFormat =>
-        if (!GlutenConfig.getConf.veloxOrcScanEnabled) {
-          ValidationResult.failed(s"Velox ORC scan is turned off.")
+    def validateFormat(): Option[String] = {
+      def validateTypes(validatorFunc: PartialFunction[StructField, String]): 
Option[String] = {
+        // Collect unsupported types.
+        val unsupportedDataTypeReason = fields.collect(validatorFunc)
+        if (unsupportedDataTypeReason.nonEmpty) {
+          Some(
+            s"Found unsupported data type in $format: 
${unsupportedDataTypeReason.mkString(", ")}.")
         } else {
+          None
+        }
+      }
+
+      def isCharType(stringType: StringType, metadata: Metadata): Boolean = {
+        val charTypePattern = "char\\((\\d+)\\)".r
+        GlutenConfig.getConf.forceOrcCharTypeScanFallbackEnabled && 
charTypePattern
+          .findFirstIn(
+            CharVarcharUtils
+              .getRawTypeString(metadata)
+              .getOrElse(stringType.catalogString))
+          .isDefined
+      }
+
+      format match {
+        case ParquetReadFormat =>
           val typeValidator: PartialFunction[StructField, String] = {
-            case StructField(_, arrayType: ArrayType, _, _)
-                if arrayType.elementType.isInstanceOf[StructType] =>
-              "StructType as element in ArrayType"
-            case StructField(_, arrayType: ArrayType, _, _)
-                if arrayType.elementType.isInstanceOf[ArrayType] =>
-              "ArrayType as element in ArrayType"
-            case StructField(_, mapType: MapType, _, _)
-                if mapType.keyType.isInstanceOf[StructType] =>
-              "StructType as Key in MapType"
-            case StructField(_, mapType: MapType, _, _)
-                if mapType.valueType.isInstanceOf[ArrayType] =>
-              "ArrayType as Value in MapType"
-            case StructField(_, stringType: StringType, _, metadata)
-                if isCharType(stringType, metadata) =>
-              CharVarcharUtils.getRawTypeString(metadata) + " not support"
-            case StructField(_, TimestampType, _, _) => "TimestampType not 
support"
+            // Parquet timestamp is not fully supported yet
+            case StructField(_, TimestampType, _, _)
+                if 
GlutenConfig.getConf.forceParquetTimestampTypeScanFallbackEnabled =>
+              "TimestampType(force fallback)"
           }
-          validateTypes(typeValidator)
-        }
-      case _ => ValidationResult.failed(s"Unsupported file format for 
$format.")
+          val parquetOptions = new 
ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get)
+          if (parquetOptions.mergeSchema) {
+            // https://github.com/apache/incubator-gluten/issues/7174
+            Some(s"not support when merge schema is true")
+          } else {
+            validateTypes(typeValidator)
+          }
+        case DwrfReadFormat => None
+        case OrcReadFormat =>
+          if (!GlutenConfig.getConf.veloxOrcScanEnabled) {
+            Some(s"Velox ORC scan is turned off, 
${GlutenConfig.VELOX_ORC_SCAN_ENABLED.key}")
+          } else {
+            val typeValidator: PartialFunction[StructField, String] = {
+              case StructField(_, arrayType: ArrayType, _, _)
+                  if arrayType.elementType.isInstanceOf[StructType] =>
+                "StructType as element in ArrayType"
+              case StructField(_, arrayType: ArrayType, _, _)
+                  if arrayType.elementType.isInstanceOf[ArrayType] =>
+                "ArrayType as element in ArrayType"
+              case StructField(_, mapType: MapType, _, _)
+                  if mapType.keyType.isInstanceOf[StructType] =>
+                "StructType as Key in MapType"
+              case StructField(_, mapType: MapType, _, _)
+                  if mapType.valueType.isInstanceOf[ArrayType] =>
+                "ArrayType as Value in MapType"
+              case StructField(_, stringType: StringType, _, metadata)
+                  if isCharType(stringType, metadata) =>
+                CharVarcharUtils.getRawTypeString(metadata) + "(force 
fallback)"
+              case StructField(_, TimestampType, _, _) => "TimestampType"
+            }
+            validateTypes(typeValidator)
+          }
+        case _ => Some(s"Unsupported file format for $format.")
+      }
     }
-  }
 
-  def isCharType(stringType: StringType, metadata: Metadata): Boolean = {
-    val charTypePattern = "char\\((\\d+)\\)".r
-    GlutenConfig.getConf.forceOrcCharTypeScanFallbackEnabled && charTypePattern
-      .findFirstIn(
-        CharVarcharUtils
-          .getRawTypeString(metadata)
-          .getOrElse(stringType.catalogString))
-      .isDefined
+    validateScheme().orElse(validateFormat()) match {
+      case Some(reason) => ValidationResult.failed(reason)
+      case _ => ValidationResult.succeeded
+    }
   }
 
   def distinctRootPaths(paths: Seq[String]): Seq[String] = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index f1f46dd87e..177d19c0c7 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -30,10 +30,11 @@ import 
org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopF
 import org.apache.spark.sql.types.StructField
 
 trait BackendSettingsApi {
-  def validateScan(
+  def validateScanExec(
       format: ReadFileFormat,
       fields: Array[StructField],
-      rootPaths: Seq[String]): ValidationResult = ValidationResult.succeeded
+      rootPaths: Seq[String],
+      properties: Map[String, String]): ValidationResult = 
ValidationResult.succeeded
 
   def supportWriteFilesExec(
       format: FileFormat,
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index d7b824b397..f272dc3eca 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -95,7 +95,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
     }
 
     val validationResult = BackendsApiManager.getSettings
-      .validateScan(fileFormat, fields, getRootFilePaths)
+      .validateScanExec(fileFormat, fields, getRootFilePaths, getProperties)
     if (!validationResult.ok()) {
       return validationResult
     }
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 55fb4ae16d..363f9c85ed 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -861,7 +861,6 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenParquetFileFormatV2Suite]
   enableSuite[GlutenParquetV1FilterSuite]
     // Rewrite.
-    .exclude("Filter applied on merged Parquet schema with new column should 
work")
     .exclude("SPARK-23852: Broken Parquet push-down for partially-written 
stats")
     .exclude("SPARK-25207: exception when duplicate fields in case-insensitive 
mode")
     // Rewrite for supported INT96 - timestamp.
@@ -875,8 +874,6 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-17091: Convert IN predicate to Parquet filter push-down")
     .exclude("Support Parquet column index")
     .exclude("SPARK-34562: Bloom filter push down")
-    // https://github.com/apache/incubator-gluten/issues/7174
-    .excludeGlutenTest("Filter applied on merged Parquet schema with new 
column should work")
   enableSuite[GlutenParquetV2FilterSuite]
     // Rewrite.
     .exclude("Filter applied on merged Parquet schema with new column should 
work")
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index 51204b0777..2f690c6155 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -26,7 +26,6 @@ import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
 import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
HadoopFsRelation, LogicalRelation, PushableColumnAndNestedColumn}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, 
LEGACY}
 import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.INT96
@@ -106,54 +105,6 @@ abstract class GlutenParquetFilterSuite extends 
ParquetFilterSuite with GlutenSQ
     }
   }
 
-  testGluten("Filter applied on merged Parquet schema with new column should 
work") {
-    import testImplicits._
-    withAllParquetReaders {
-      withSQLConf(
-        SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
-        SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
-        withTempPath {
-          dir =>
-            val path1 = s"${dir.getCanonicalPath}/table1"
-            (1 to 3)
-              .map(i => (i, i.toString, null: String))
-              .toDF("a", "b", "c")
-              .write
-              .parquet(path1)
-            val path2 = s"${dir.getCanonicalPath}/table2"
-            (1 to 3)
-              .map(i => (null: Integer, i.toString, i.toString))
-              .toDF("a", "b", "c")
-              .write
-              .parquet(path2)
-
-            // No matter "c = 1" gets pushed down or not, this query should 
work without exception.
-            val df = spark.read.parquet(path1, path2).filter("c = 
1").selectExpr("c", "b", "a")
-            df.show()
-
-            // Annotated for the type check fails.
-            // checkAnswer(df, Row(1, "1", null))
-
-            val path3 = s"${dir.getCanonicalPath}/table3"
-            val dfStruct = sparkContext.parallelize(Seq((1, 1, 
null))).toDF("a", "b", "c")
-            dfStruct.select(struct("a").as("s")).write.parquet(path3)
-
-            val path4 = s"${dir.getCanonicalPath}/table4"
-            val dfStruct2 = sparkContext.parallelize(Seq((null, 1, 
1))).toDF("a", "b", "c")
-            dfStruct2.select(struct("c").as("s")).write.parquet(path4)
-
-            // No matter "s.c = 1" gets pushed down or not, this query
-            // should work without exception.
-            val dfStruct3 = spark.read
-              .parquet(path3, path4)
-              .filter("s.c = 1")
-              .selectExpr("s")
-            checkAnswer(dfStruct3, Row(Row(null, 1)))
-        }
-      }
-    }
-  }
-
   testGluten("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
     import testImplicits._
 
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 8b56f63f65..4e8f13ee44 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -663,7 +663,6 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenParquetFileFormatV2Suite]
   enableSuite[GlutenParquetV1FilterSuite]
     // Rewrite.
-    .exclude("Filter applied on merged Parquet schema with new column should 
work")
     .exclude("SPARK-23852: Broken Parquet push-down for partially-written 
stats")
     // Rewrite for supported INT96 - timestamp.
     .exclude("filter pushdown - timestamp")
@@ -679,11 +678,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("Support Parquet column index")
     .exclude("SPARK-34562: Bloom filter push down")
     .exclude("SPARK-16371 Do not push down filters when inner name and outer 
name are the same")
-    // https://github.com/apache/incubator-gluten/issues/7174
-    .excludeGlutenTest("Filter applied on merged Parquet schema with new 
column should work")
   enableSuite[GlutenParquetV2FilterSuite]
     // Rewrite.
-    .exclude("Filter applied on merged Parquet schema with new column should 
work")
     .exclude("SPARK-23852: Broken Parquet push-down for partially-written 
stats")
     // Rewrite for supported INT96 - timestamp.
     .exclude("filter pushdown - timestamp")
@@ -699,8 +695,6 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("Support Parquet column index")
     .exclude("SPARK-34562: Bloom filter push down")
     .exclude("SPARK-16371 Do not push down filters when inner name and outer 
name are the same")
-    // https://github.com/apache/incubator-gluten/issues/7174
-    .excludeGlutenTest("Filter applied on merged Parquet schema with new 
column should work")
   enableSuite[GlutenParquetInteroperabilitySuite]
     .exclude("parquet timestamp conversion")
   enableSuite[GlutenParquetIOSuite]
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index a1163f9525..02b30a46a6 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -26,7 +26,6 @@ import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
 import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
HadoopFsRelation, LogicalRelation, PushableColumnAndNestedColumn}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, 
LEGACY}
 import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.INT96
@@ -106,54 +105,6 @@ abstract class GlutenParquetFilterSuite extends 
ParquetFilterSuite with GlutenSQ
     }
   }
 
-  testGluten("Filter applied on merged Parquet schema with new column should 
work") {
-    import testImplicits._
-    withAllParquetReaders {
-      withSQLConf(
-        SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
-        SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
-        withTempPath {
-          dir =>
-            val path1 = s"${dir.getCanonicalPath}/table1"
-            (1 to 3)
-              .map(i => (i, i.toString, null: String))
-              .toDF("a", "b", "c")
-              .write
-              .parquet(path1)
-            val path2 = s"${dir.getCanonicalPath}/table2"
-            (1 to 3)
-              .map(i => (null: Integer, i.toString, i.toString))
-              .toDF("a", "b", "c")
-              .write
-              .parquet(path2)
-
-            // No matter "c = 1" gets pushed down or not, this query should 
work without exception.
-            val df = spark.read.parquet(path1, path2).filter("c = 
1").selectExpr("c", "b", "a")
-            df.show()
-
-            // Annotated for the type check fails.
-            // checkAnswer(df, Row(1, "1", null))
-
-            val path3 = s"${dir.getCanonicalPath}/table3"
-            val dfStruct = sparkContext.parallelize(Seq((1, 1, 
null))).toDF("a", "b", "c")
-            dfStruct.select(struct("a").as("s")).write.parquet(path3)
-
-            val path4 = s"${dir.getCanonicalPath}/table4"
-            val dfStruct2 = sparkContext.parallelize(Seq((null, 1, 
1))).toDF("a", "b", "c")
-            dfStruct2.select(struct("c").as("s")).write.parquet(path4)
-
-            // No matter "s.c = 1" gets pushed down or not, this query should 
work
-            // without exception.
-            val dfStruct3 = spark.read
-              .parquet(path3, path4)
-              .filter("s.c = 1")
-              .selectExpr("s")
-            checkAnswer(dfStruct3, Row(Row(null, 1)))
-        }
-      }
-    }
-  }
-
   testGluten("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
     import testImplicits._
 
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 22a9e62c09..0f3c43dfdf 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -644,7 +644,6 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenParquetFileFormatV2Suite]
   enableSuite[GlutenParquetV1FilterSuite]
     // Rewrite.
-    .exclude("Filter applied on merged Parquet schema with new column should 
work")
     .exclude("SPARK-23852: Broken Parquet push-down for partially-written 
stats")
     // Rewrite for supported INT96 - timestamp.
     .exclude("filter pushdown - timestamp")
@@ -660,11 +659,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-34562: Bloom filter push down")
     .exclude("SPARK-16371 Do not push down filters when inner name and outer 
name are the same")
     .exclude("filter pushdown - StringPredicate")
-    // https://github.com/apache/incubator-gluten/issues/7174
-    .excludeGlutenTest("Filter applied on merged Parquet schema with new 
column should work")
   enableSuite[GlutenParquetV2FilterSuite]
     // Rewrite.
-    .exclude("Filter applied on merged Parquet schema with new column should 
work")
     .exclude("SPARK-23852: Broken Parquet push-down for partially-written 
stats")
     // Rewrite for supported INT96 - timestamp.
     .exclude("filter pushdown - timestamp")
@@ -680,8 +676,6 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-34562: Bloom filter push down")
     .exclude("SPARK-16371 Do not push down filters when inner name and outer 
name are the same")
     .exclude("filter pushdown - StringPredicate")
-    // https://github.com/apache/incubator-gluten/issues/7174
-    .excludeGlutenTest("Filter applied on merged Parquet schema with new 
column should work")
   enableSuite[GlutenParquetInteroperabilitySuite]
     .exclude("parquet timestamp conversion")
   enableSuite[GlutenParquetIOSuite]
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index b4a4b6017b..a4f830e187 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -26,7 +26,6 @@ import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
 import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
HadoopFsRelation, LogicalRelation, PushableColumnAndNestedColumn}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, 
LEGACY}
 import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.INT96
@@ -105,54 +104,6 @@ abstract class GlutenParquetFilterSuite extends 
ParquetFilterSuite with GlutenSQ
     }
   }
 
-  testGluten("Filter applied on merged Parquet schema with new column should 
work") {
-    import testImplicits._
-    withAllParquetReaders {
-      withSQLConf(
-        SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
-        SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
-        withTempPath {
-          dir =>
-            val path1 = s"${dir.getCanonicalPath}/table1"
-            (1 to 3)
-              .map(i => (i, i.toString, null: String))
-              .toDF("a", "b", "c")
-              .write
-              .parquet(path1)
-            val path2 = s"${dir.getCanonicalPath}/table2"
-            (1 to 3)
-              .map(i => (null: Integer, i.toString, i.toString))
-              .toDF("a", "b", "c")
-              .write
-              .parquet(path2)
-
-            // No matter "c = 1" gets pushed down or not, this query should 
work without exception.
-            val df = spark.read.parquet(path1, path2).filter("c = 
1").selectExpr("c", "b", "a")
-            df.show()
-
-            // Annotated for the type check fails.
-            // checkAnswer(df, Row(1, "1", null))
-
-            val path3 = s"${dir.getCanonicalPath}/table3"
-            val dfStruct = sparkContext.parallelize(Seq((1, 1, 
null))).toDF("a", "b", "c")
-            dfStruct.select(struct("a").as("s")).write.parquet(path3)
-
-            val path4 = s"${dir.getCanonicalPath}/table4"
-            val dfStruct2 = sparkContext.parallelize(Seq((null, 1, 
1))).toDF("a", "b", "c")
-            dfStruct2.select(struct("c").as("s")).write.parquet(path4)
-
-            // No matter "s.c = 1" gets pushed down or not, this query should 
work
-            // without exception.
-            val dfStruct3 = spark.read
-              .parquet(path3, path4)
-              .filter("s.c = 1")
-              .selectExpr("s")
-            checkAnswer(dfStruct3, Row(Row(null, 1)))
-        }
-      }
-    }
-  }
-
   testGluten("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
     import testImplicits._
 
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 3f6bea5dd1..a9525b1b0e 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -655,7 +655,6 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenParquetFileFormatV2Suite]
   enableSuite[GlutenParquetV1FilterSuite]
     // Rewrite.
-    .exclude("Filter applied on merged Parquet schema with new column should 
work")
     .exclude("SPARK-23852: Broken Parquet push-down for partially-written 
stats")
     // Rewrite for supported INT96 - timestamp.
     .exclude("filter pushdown - timestamp")
@@ -671,11 +670,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-34562: Bloom filter push down")
     .exclude("SPARK-16371 Do not push down filters when inner name and outer 
name are the same")
     .exclude("filter pushdown - StringPredicate")
-    // https://github.com/apache/incubator-gluten/issues/7174
-    .excludeGlutenTest("Filter applied on merged Parquet schema with new 
column should work")
   enableSuite[GlutenParquetV2FilterSuite]
     // Rewrite.
-    .exclude("Filter applied on merged Parquet schema with new column should 
work")
     .exclude("SPARK-23852: Broken Parquet push-down for partially-written 
stats")
     // Rewrite for supported INT96 - timestamp.
     .exclude("filter pushdown - timestamp")
@@ -691,8 +687,6 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-34562: Bloom filter push down")
     .exclude("SPARK-16371 Do not push down filters when inner name and outer 
name are the same")
     .exclude("filter pushdown - StringPredicate")
-    // https://github.com/apache/incubator-gluten/issues/7174
-    .excludeGlutenTest("Filter applied on merged Parquet schema with new 
column should work")
   enableSuite[GlutenParquetInteroperabilitySuite]
     .exclude("parquet timestamp conversion")
   enableSuite[GlutenParquetIOSuite]
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index 063b424e0d..3c52ec82e9 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -26,7 +26,6 @@ import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
 import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
HadoopFsRelation, LogicalRelation, PushableColumnAndNestedColumn}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.LegacyBehaviorPolicy.{CORRECTED, LEGACY}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.INT96
@@ -105,54 +104,6 @@ abstract class GlutenParquetFilterSuite extends 
ParquetFilterSuite with GlutenSQ
     }
   }
 
-  testGluten("Filter applied on merged Parquet schema with new column should 
work") {
-    import testImplicits._
-    withAllParquetReaders {
-      withSQLConf(
-        SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
-        SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
-        withTempPath {
-          dir =>
-            val path1 = s"${dir.getCanonicalPath}/table1"
-            (1 to 3)
-              .map(i => (i, i.toString, null: String))
-              .toDF("a", "b", "c")
-              .write
-              .parquet(path1)
-            val path2 = s"${dir.getCanonicalPath}/table2"
-            (1 to 3)
-              .map(i => (null: Integer, i.toString, i.toString))
-              .toDF("a", "b", "c")
-              .write
-              .parquet(path2)
-
-            // No matter "c = 1" gets pushed down or not, this query should 
work without exception.
-            val df = spark.read.parquet(path1, path2).filter("c = 
1").selectExpr("c", "b", "a")
-            df.show()
-
-            // Annotated for the type check fails.
-            // checkAnswer(df, Row(1, "1", null))
-
-            val path3 = s"${dir.getCanonicalPath}/table3"
-            val dfStruct = sparkContext.parallelize(Seq((1, 1, 
null))).toDF("a", "b", "c")
-            dfStruct.select(struct("a").as("s")).write.parquet(path3)
-
-            val path4 = s"${dir.getCanonicalPath}/table4"
-            val dfStruct2 = sparkContext.parallelize(Seq((null, 1, 
1))).toDF("a", "b", "c")
-            dfStruct2.select(struct("c").as("s")).write.parquet(path4)
-
-            // No matter "s.c = 1" gets pushed down or not, this query should 
work
-            // without exception.
-            val dfStruct3 = spark.read
-              .parquet(path3, path4)
-              .filter("s.c = 1")
-              .selectExpr("s")
-            checkAnswer(dfStruct3, Row(Row(null, 1)))
-        }
-      }
-    }
-  }
-
   testGluten("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
     import testImplicits._
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to