This is an automated email from the ASF dual-hosted git repository.
yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 0bb41c742f [GLUTEN-7174][VL] Force fallback scan operator when
spark.sql.parquet.mergeSchema enabled (#7634)
0bb41c742f is described below
commit 0bb41c742ff3adbe16263db9752dca8e64649d20
Author: Yang Zhang <[email protected]>
AuthorDate: Sun Nov 3 21:24:06 2024 +0800
[GLUTEN-7174][VL] Force fallback scan operator when
spark.sql.parquet.mergeSchema enabled (#7634)
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 5 +-
.../gluten/backendsapi/velox/VeloxBackend.scala | 142 ++++++++++++---------
.../gluten/backendsapi/BackendSettingsApi.scala | 5 +-
.../execution/BasicScanExecTransformer.scala | 2 +-
.../gluten/utils/velox/VeloxTestSettings.scala | 3 -
.../parquet/GlutenParquetFilterSuite.scala | 49 -------
.../gluten/utils/velox/VeloxTestSettings.scala | 6 -
.../parquet/GlutenParquetFilterSuite.scala | 49 -------
.../gluten/utils/velox/VeloxTestSettings.scala | 6 -
.../parquet/GlutenParquetFilterSuite.scala | 49 -------
.../gluten/utils/velox/VeloxTestSettings.scala | 6 -
.../parquet/GlutenParquetFilterSuite.scala | 49 -------
12 files changed, 86 insertions(+), 285 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 59d912d8e7..9a1b00f714 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -142,10 +142,11 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
.toLowerCase(Locale.getDefault)
}
- override def validateScan(
+ override def validateScanExec(
format: ReadFileFormat,
fields: Array[StructField],
- rootPaths: Seq[String]): ValidationResult = {
+ rootPaths: Seq[String],
+ properties: Map[String, String]): ValidationResult = {
// Validate if all types are supported.
def hasComplexType: Boolean = {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 251b93cc7c..539059cdb6 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -35,12 +35,12 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank,
Descending, Expression, Lag, Lead, NamedExpression, NthValue, NTile,
PercentRank, RangeFrame, Rank, RowNumber, SortOrder, SpecialFrameBoundary,
SpecifiedWindowFrame}
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
ApproximatePercentile, Percentile}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
-import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
CharVarcharUtils}
import org.apache.spark.sql.execution.{ColumnarCachedBatchSerializer,
SparkPlan}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat,
InsertIntoHadoopFsRelationCommand}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
ParquetOptions}
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -88,78 +88,94 @@ object VeloxBackendSettings extends BackendSettingsApi {
val GLUTEN_VELOX_INTERNAL_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX +
".internal.udfLibraryPaths"
val GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION = VeloxBackend.CONF_PREFIX +
".udfAllowTypeConversion"
- val MAXIMUM_BATCH_SIZE: Int = 32768
-
- override def validateScan(
+ override def validateScanExec(
format: ReadFileFormat,
fields: Array[StructField],
- rootPaths: Seq[String]): ValidationResult = {
- val filteredRootPaths = distinctRootPaths(rootPaths)
- if (
- filteredRootPaths.nonEmpty && !VeloxFileSystemValidationJniWrapper
- .allSupportedByRegisteredFileSystems(filteredRootPaths.toArray)
- ) {
- return ValidationResult.failed(
- s"Scheme of [$filteredRootPaths] is not supported by registered file
systems.")
- }
- // Validate if all types are supported.
- def validateTypes(validatorFunc: PartialFunction[StructField, String]):
ValidationResult = {
- // Collect unsupported types.
- val unsupportedDataTypeReason = fields.collect(validatorFunc)
- if (unsupportedDataTypeReason.isEmpty) {
- ValidationResult.succeeded
+ rootPaths: Seq[String],
+ properties: Map[String, String]): ValidationResult = {
+
+ def validateScheme(): Option[String] = {
+ val filteredRootPaths = distinctRootPaths(rootPaths)
+ if (
+ filteredRootPaths.nonEmpty && !VeloxFileSystemValidationJniWrapper
+ .allSupportedByRegisteredFileSystems(filteredRootPaths.toArray)
+ ) {
+ Some(s"Scheme of [$filteredRootPaths] is not supported by registered
file systems.")
} else {
- ValidationResult.failed(
- s"Found unsupported data type in $format:
${unsupportedDataTypeReason.mkString(", ")}.")
+ None
}
}
- format match {
- case ParquetReadFormat =>
- val typeValidator: PartialFunction[StructField, String] = {
- // Parquet timestamp is not fully supported yet
- case StructField(_, TimestampType, _, _)
- if
GlutenConfig.getConf.forceParquetTimestampTypeScanFallbackEnabled =>
- "TimestampType"
- }
- validateTypes(typeValidator)
- case DwrfReadFormat => ValidationResult.succeeded
- case OrcReadFormat =>
- if (!GlutenConfig.getConf.veloxOrcScanEnabled) {
- ValidationResult.failed(s"Velox ORC scan is turned off.")
+ def validateFormat(): Option[String] = {
+ def validateTypes(validatorFunc: PartialFunction[StructField, String]):
Option[String] = {
+ // Collect unsupported types.
+ val unsupportedDataTypeReason = fields.collect(validatorFunc)
+ if (unsupportedDataTypeReason.nonEmpty) {
+ Some(
+ s"Found unsupported data type in $format:
${unsupportedDataTypeReason.mkString(", ")}.")
} else {
+ None
+ }
+ }
+
+ def isCharType(stringType: StringType, metadata: Metadata): Boolean = {
+ val charTypePattern = "char\\((\\d+)\\)".r
+ GlutenConfig.getConf.forceOrcCharTypeScanFallbackEnabled &&
charTypePattern
+ .findFirstIn(
+ CharVarcharUtils
+ .getRawTypeString(metadata)
+ .getOrElse(stringType.catalogString))
+ .isDefined
+ }
+
+ format match {
+ case ParquetReadFormat =>
val typeValidator: PartialFunction[StructField, String] = {
- case StructField(_, arrayType: ArrayType, _, _)
- if arrayType.elementType.isInstanceOf[StructType] =>
- "StructType as element in ArrayType"
- case StructField(_, arrayType: ArrayType, _, _)
- if arrayType.elementType.isInstanceOf[ArrayType] =>
- "ArrayType as element in ArrayType"
- case StructField(_, mapType: MapType, _, _)
- if mapType.keyType.isInstanceOf[StructType] =>
- "StructType as Key in MapType"
- case StructField(_, mapType: MapType, _, _)
- if mapType.valueType.isInstanceOf[ArrayType] =>
- "ArrayType as Value in MapType"
- case StructField(_, stringType: StringType, _, metadata)
- if isCharType(stringType, metadata) =>
- CharVarcharUtils.getRawTypeString(metadata) + " not support"
- case StructField(_, TimestampType, _, _) => "TimestampType not
support"
+ // Parquet timestamp is not fully supported yet
+ case StructField(_, TimestampType, _, _)
+ if
GlutenConfig.getConf.forceParquetTimestampTypeScanFallbackEnabled =>
+ "TimestampType(force fallback)"
}
- validateTypes(typeValidator)
- }
- case _ => ValidationResult.failed(s"Unsupported file format for
$format.")
+ val parquetOptions = new
ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get)
+ if (parquetOptions.mergeSchema) {
+ // https://github.com/apache/incubator-gluten/issues/7174
+ Some(s"not support when merge schema is true")
+ } else {
+ validateTypes(typeValidator)
+ }
+ case DwrfReadFormat => None
+ case OrcReadFormat =>
+ if (!GlutenConfig.getConf.veloxOrcScanEnabled) {
+ Some(s"Velox ORC scan is turned off,
${GlutenConfig.VELOX_ORC_SCAN_ENABLED.key}")
+ } else {
+ val typeValidator: PartialFunction[StructField, String] = {
+ case StructField(_, arrayType: ArrayType, _, _)
+ if arrayType.elementType.isInstanceOf[StructType] =>
+ "StructType as element in ArrayType"
+ case StructField(_, arrayType: ArrayType, _, _)
+ if arrayType.elementType.isInstanceOf[ArrayType] =>
+ "ArrayType as element in ArrayType"
+ case StructField(_, mapType: MapType, _, _)
+ if mapType.keyType.isInstanceOf[StructType] =>
+ "StructType as Key in MapType"
+ case StructField(_, mapType: MapType, _, _)
+ if mapType.valueType.isInstanceOf[ArrayType] =>
+ "ArrayType as Value in MapType"
+ case StructField(_, stringType: StringType, _, metadata)
+ if isCharType(stringType, metadata) =>
+ CharVarcharUtils.getRawTypeString(metadata) + "(force
fallback)"
+ case StructField(_, TimestampType, _, _) => "TimestampType"
+ }
+ validateTypes(typeValidator)
+ }
+ case _ => Some(s"Unsupported file format for $format.")
+ }
}
- }
- def isCharType(stringType: StringType, metadata: Metadata): Boolean = {
- val charTypePattern = "char\\((\\d+)\\)".r
- GlutenConfig.getConf.forceOrcCharTypeScanFallbackEnabled && charTypePattern
- .findFirstIn(
- CharVarcharUtils
- .getRawTypeString(metadata)
- .getOrElse(stringType.catalogString))
- .isDefined
+ validateScheme().orElse(validateFormat()) match {
+ case Some(reason) => ValidationResult.failed(reason)
+ case _ => ValidationResult.succeeded
+ }
}
def distinctRootPaths(paths: Seq[String]): Seq[String] = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index f1f46dd87e..177d19c0c7 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -30,10 +30,11 @@ import
org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopF
import org.apache.spark.sql.types.StructField
trait BackendSettingsApi {
- def validateScan(
+ def validateScanExec(
format: ReadFileFormat,
fields: Array[StructField],
- rootPaths: Seq[String]): ValidationResult = ValidationResult.succeeded
+ rootPaths: Seq[String],
+ properties: Map[String, String]): ValidationResult =
ValidationResult.succeeded
def supportWriteFilesExec(
format: FileFormat,
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index d7b824b397..f272dc3eca 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -95,7 +95,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
}
val validationResult = BackendsApiManager.getSettings
- .validateScan(fileFormat, fields, getRootFilePaths)
+ .validateScanExec(fileFormat, fields, getRootFilePaths, getProperties)
if (!validationResult.ok()) {
return validationResult
}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 55fb4ae16d..363f9c85ed 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -861,7 +861,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenParquetFileFormatV2Suite]
enableSuite[GlutenParquetV1FilterSuite]
// Rewrite.
- .exclude("Filter applied on merged Parquet schema with new column should
work")
.exclude("SPARK-23852: Broken Parquet push-down for partially-written
stats")
.exclude("SPARK-25207: exception when duplicate fields in case-insensitive
mode")
// Rewrite for supported INT96 - timestamp.
@@ -875,8 +874,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-17091: Convert IN predicate to Parquet filter push-down")
.exclude("Support Parquet column index")
.exclude("SPARK-34562: Bloom filter push down")
- // https://github.com/apache/incubator-gluten/issues/7174
- .excludeGlutenTest("Filter applied on merged Parquet schema with new
column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
.exclude("Filter applied on merged Parquet schema with new column should
work")
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index 51204b0777..2f690c6155 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -26,7 +26,6 @@ import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy,
HadoopFsRelation, LogicalRelation, PushableColumnAndNestedColumn}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED,
LEGACY}
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.INT96
@@ -106,54 +105,6 @@ abstract class GlutenParquetFilterSuite extends
ParquetFilterSuite with GlutenSQ
}
}
- testGluten("Filter applied on merged Parquet schema with new column should
work") {
- import testImplicits._
- withAllParquetReaders {
- withSQLConf(
- SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
- SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
- withTempPath {
- dir =>
- val path1 = s"${dir.getCanonicalPath}/table1"
- (1 to 3)
- .map(i => (i, i.toString, null: String))
- .toDF("a", "b", "c")
- .write
- .parquet(path1)
- val path2 = s"${dir.getCanonicalPath}/table2"
- (1 to 3)
- .map(i => (null: Integer, i.toString, i.toString))
- .toDF("a", "b", "c")
- .write
- .parquet(path2)
-
- // No matter "c = 1" gets pushed down or not, this query should
work without exception.
- val df = spark.read.parquet(path1, path2).filter("c =
1").selectExpr("c", "b", "a")
- df.show()
-
- // Annotated for the type check fails.
- // checkAnswer(df, Row(1, "1", null))
-
- val path3 = s"${dir.getCanonicalPath}/table3"
- val dfStruct = sparkContext.parallelize(Seq((1, 1,
null))).toDF("a", "b", "c")
- dfStruct.select(struct("a").as("s")).write.parquet(path3)
-
- val path4 = s"${dir.getCanonicalPath}/table4"
- val dfStruct2 = sparkContext.parallelize(Seq((null, 1,
1))).toDF("a", "b", "c")
- dfStruct2.select(struct("c").as("s")).write.parquet(path4)
-
- // No matter "s.c = 1" gets pushed down or not, this query
- // should work without exception.
- val dfStruct3 = spark.read
- .parquet(path3, path4)
- .filter("s.c = 1")
- .selectExpr("s")
- checkAnswer(dfStruct3, Row(Row(null, 1)))
- }
- }
- }
- }
-
testGluten("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
import testImplicits._
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 8b56f63f65..4e8f13ee44 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -663,7 +663,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenParquetFileFormatV2Suite]
enableSuite[GlutenParquetV1FilterSuite]
// Rewrite.
- .exclude("Filter applied on merged Parquet schema with new column should
work")
.exclude("SPARK-23852: Broken Parquet push-down for partially-written
stats")
// Rewrite for supported INT96 - timestamp.
.exclude("filter pushdown - timestamp")
@@ -679,11 +678,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Support Parquet column index")
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer
name are the same")
- // https://github.com/apache/incubator-gluten/issues/7174
- .excludeGlutenTest("Filter applied on merged Parquet schema with new
column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
- .exclude("Filter applied on merged Parquet schema with new column should
work")
.exclude("SPARK-23852: Broken Parquet push-down for partially-written
stats")
// Rewrite for supported INT96 - timestamp.
.exclude("filter pushdown - timestamp")
@@ -699,8 +695,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Support Parquet column index")
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer
name are the same")
- // https://github.com/apache/incubator-gluten/issues/7174
- .excludeGlutenTest("Filter applied on merged Parquet schema with new
column should work")
enableSuite[GlutenParquetInteroperabilitySuite]
.exclude("parquet timestamp conversion")
enableSuite[GlutenParquetIOSuite]
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index a1163f9525..02b30a46a6 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -26,7 +26,6 @@ import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy,
HadoopFsRelation, LogicalRelation, PushableColumnAndNestedColumn}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED,
LEGACY}
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.INT96
@@ -106,54 +105,6 @@ abstract class GlutenParquetFilterSuite extends
ParquetFilterSuite with GlutenSQ
}
}
- testGluten("Filter applied on merged Parquet schema with new column should
work") {
- import testImplicits._
- withAllParquetReaders {
- withSQLConf(
- SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
- SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
- withTempPath {
- dir =>
- val path1 = s"${dir.getCanonicalPath}/table1"
- (1 to 3)
- .map(i => (i, i.toString, null: String))
- .toDF("a", "b", "c")
- .write
- .parquet(path1)
- val path2 = s"${dir.getCanonicalPath}/table2"
- (1 to 3)
- .map(i => (null: Integer, i.toString, i.toString))
- .toDF("a", "b", "c")
- .write
- .parquet(path2)
-
- // No matter "c = 1" gets pushed down or not, this query should
work without exception.
- val df = spark.read.parquet(path1, path2).filter("c =
1").selectExpr("c", "b", "a")
- df.show()
-
- // Annotated for the type check fails.
- // checkAnswer(df, Row(1, "1", null))
-
- val path3 = s"${dir.getCanonicalPath}/table3"
- val dfStruct = sparkContext.parallelize(Seq((1, 1,
null))).toDF("a", "b", "c")
- dfStruct.select(struct("a").as("s")).write.parquet(path3)
-
- val path4 = s"${dir.getCanonicalPath}/table4"
- val dfStruct2 = sparkContext.parallelize(Seq((null, 1,
1))).toDF("a", "b", "c")
- dfStruct2.select(struct("c").as("s")).write.parquet(path4)
-
- // No matter "s.c = 1" gets pushed down or not, this query should
work
- // without exception.
- val dfStruct3 = spark.read
- .parquet(path3, path4)
- .filter("s.c = 1")
- .selectExpr("s")
- checkAnswer(dfStruct3, Row(Row(null, 1)))
- }
- }
- }
- }
-
testGluten("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
import testImplicits._
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 22a9e62c09..0f3c43dfdf 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -644,7 +644,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenParquetFileFormatV2Suite]
enableSuite[GlutenParquetV1FilterSuite]
// Rewrite.
- .exclude("Filter applied on merged Parquet schema with new column should
work")
.exclude("SPARK-23852: Broken Parquet push-down for partially-written
stats")
// Rewrite for supported INT96 - timestamp.
.exclude("filter pushdown - timestamp")
@@ -660,11 +659,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer
name are the same")
.exclude("filter pushdown - StringPredicate")
- // https://github.com/apache/incubator-gluten/issues/7174
- .excludeGlutenTest("Filter applied on merged Parquet schema with new
column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
- .exclude("Filter applied on merged Parquet schema with new column should
work")
.exclude("SPARK-23852: Broken Parquet push-down for partially-written
stats")
// Rewrite for supported INT96 - timestamp.
.exclude("filter pushdown - timestamp")
@@ -680,8 +676,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer
name are the same")
.exclude("filter pushdown - StringPredicate")
- // https://github.com/apache/incubator-gluten/issues/7174
- .excludeGlutenTest("Filter applied on merged Parquet schema with new
column should work")
enableSuite[GlutenParquetInteroperabilitySuite]
.exclude("parquet timestamp conversion")
enableSuite[GlutenParquetIOSuite]
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index b4a4b6017b..a4f830e187 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -26,7 +26,6 @@ import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy,
HadoopFsRelation, LogicalRelation, PushableColumnAndNestedColumn}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED,
LEGACY}
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.INT96
@@ -105,54 +104,6 @@ abstract class GlutenParquetFilterSuite extends
ParquetFilterSuite with GlutenSQ
}
}
- testGluten("Filter applied on merged Parquet schema with new column should
work") {
- import testImplicits._
- withAllParquetReaders {
- withSQLConf(
- SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
- SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
- withTempPath {
- dir =>
- val path1 = s"${dir.getCanonicalPath}/table1"
- (1 to 3)
- .map(i => (i, i.toString, null: String))
- .toDF("a", "b", "c")
- .write
- .parquet(path1)
- val path2 = s"${dir.getCanonicalPath}/table2"
- (1 to 3)
- .map(i => (null: Integer, i.toString, i.toString))
- .toDF("a", "b", "c")
- .write
- .parquet(path2)
-
- // No matter "c = 1" gets pushed down or not, this query should
work without exception.
- val df = spark.read.parquet(path1, path2).filter("c =
1").selectExpr("c", "b", "a")
- df.show()
-
- // Annotated for the type check fails.
- // checkAnswer(df, Row(1, "1", null))
-
- val path3 = s"${dir.getCanonicalPath}/table3"
- val dfStruct = sparkContext.parallelize(Seq((1, 1,
null))).toDF("a", "b", "c")
- dfStruct.select(struct("a").as("s")).write.parquet(path3)
-
- val path4 = s"${dir.getCanonicalPath}/table4"
- val dfStruct2 = sparkContext.parallelize(Seq((null, 1,
1))).toDF("a", "b", "c")
- dfStruct2.select(struct("c").as("s")).write.parquet(path4)
-
- // No matter "s.c = 1" gets pushed down or not, this query should
work
- // without exception.
- val dfStruct3 = spark.read
- .parquet(path3, path4)
- .filter("s.c = 1")
- .selectExpr("s")
- checkAnswer(dfStruct3, Row(Row(null, 1)))
- }
- }
- }
- }
-
testGluten("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
import testImplicits._
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 3f6bea5dd1..a9525b1b0e 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -655,7 +655,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenParquetFileFormatV2Suite]
enableSuite[GlutenParquetV1FilterSuite]
// Rewrite.
- .exclude("Filter applied on merged Parquet schema with new column should
work")
.exclude("SPARK-23852: Broken Parquet push-down for partially-written
stats")
// Rewrite for supported INT96 - timestamp.
.exclude("filter pushdown - timestamp")
@@ -671,11 +670,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer
name are the same")
.exclude("filter pushdown - StringPredicate")
- // https://github.com/apache/incubator-gluten/issues/7174
- .excludeGlutenTest("Filter applied on merged Parquet schema with new
column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
- .exclude("Filter applied on merged Parquet schema with new column should
work")
.exclude("SPARK-23852: Broken Parquet push-down for partially-written
stats")
// Rewrite for supported INT96 - timestamp.
.exclude("filter pushdown - timestamp")
@@ -691,8 +687,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer
name are the same")
.exclude("filter pushdown - StringPredicate")
- // https://github.com/apache/incubator-gluten/issues/7174
- .excludeGlutenTest("Filter applied on merged Parquet schema with new
column should work")
enableSuite[GlutenParquetInteroperabilitySuite]
.exclude("parquet timestamp conversion")
enableSuite[GlutenParquetIOSuite]
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index 063b424e0d..3c52ec82e9 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -26,7 +26,6 @@ import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy,
HadoopFsRelation, LogicalRelation, PushableColumnAndNestedColumn}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
-import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.LegacyBehaviorPolicy.{CORRECTED, LEGACY}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.INT96
@@ -105,54 +104,6 @@ abstract class GlutenParquetFilterSuite extends
ParquetFilterSuite with GlutenSQ
}
}
- testGluten("Filter applied on merged Parquet schema with new column should
work") {
- import testImplicits._
- withAllParquetReaders {
- withSQLConf(
- SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
- SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
- withTempPath {
- dir =>
- val path1 = s"${dir.getCanonicalPath}/table1"
- (1 to 3)
- .map(i => (i, i.toString, null: String))
- .toDF("a", "b", "c")
- .write
- .parquet(path1)
- val path2 = s"${dir.getCanonicalPath}/table2"
- (1 to 3)
- .map(i => (null: Integer, i.toString, i.toString))
- .toDF("a", "b", "c")
- .write
- .parquet(path2)
-
- // No matter "c = 1" gets pushed down or not, this query should
work without exception.
- val df = spark.read.parquet(path1, path2).filter("c =
1").selectExpr("c", "b", "a")
- df.show()
-
- // Annotated for the type check fails.
- // checkAnswer(df, Row(1, "1", null))
-
- val path3 = s"${dir.getCanonicalPath}/table3"
- val dfStruct = sparkContext.parallelize(Seq((1, 1,
null))).toDF("a", "b", "c")
- dfStruct.select(struct("a").as("s")).write.parquet(path3)
-
- val path4 = s"${dir.getCanonicalPath}/table4"
- val dfStruct2 = sparkContext.parallelize(Seq((null, 1,
1))).toDF("a", "b", "c")
- dfStruct2.select(struct("c").as("s")).write.parquet(path4)
-
- // No matter "s.c = 1" gets pushed down or not, this query should
work
- // without exception.
- val dfStruct3 = spark.read
- .parquet(path3, path4)
- .filter("s.c = 1")
- .selectExpr("s")
- checkAnswer(dfStruct3, Row(Row(null, 1)))
- }
- }
- }
- }
-
testGluten("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
import testImplicits._
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]