This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 724d6a8 [SPARK-36645][SQL][FOLLOWUP] Disable min/max push down for
Parquet Binary
724d6a8 is described below
commit 724d6a83df0aa7927cbaa205fcd7c48f544432bc
Author: Huaxin Gao <[email protected]>
AuthorDate: Fri Oct 22 13:28:54 2021 +0900
[SPARK-36645][SQL][FOLLOWUP] Disable min/max push down for Parquet Binary
### What changes were proposed in this pull request?
Disable min/max push down for Parquet Binary
### Why are the changes needed?
Parquet Binary min/max could be truncated. We may get wrong result if we
rely on parquet Binary min/max.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
modify existing tests
Closes #34346 from huaxingao/disableBinary.
Authored-by: Huaxin Gao <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../v2/parquet/ParquetScanBuilder.scala | 12 ++--
.../parquet/ParquetAggregatePushDownSuite.scala | 69 +++++++++++-----------
2 files changed, 41 insertions(+), 40 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
index c579867..113438a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
@@ -28,7 +28,7 @@ import
org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, Spark
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{ArrayType, LongType, MapType, StructField,
StructType, TimestampType}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DateType,
DoubleType, FloatType, IntegerType, LongType, ShortType, StructField,
StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
case class ParquetScanBuilder(
@@ -114,11 +114,15 @@ case class ParquetScanBuilder(
// not push down complex type
// not push down Timestamp because INT96 sort order is undefined,
// Parquet doesn't return statistics for INT96
- case StructType(_) | ArrayType(_, _) | MapType(_, _, _) |
TimestampType =>
- false
- case _ =>
+ // not push down Parquet Binary because min/max could be truncated
+ // (https://issues.apache.org/jira/browse/PARQUET-1685), Parquet Binary
+ // could be Spark StringType, BinaryType or DecimalType
+ case BooleanType | ByteType | ShortType | IntegerType
+ | LongType | FloatType | DoubleType | DateType =>
finalSchema = finalSchema.add(structField.copy(s"$aggType(" +
structField.name + ")"))
true
+ case _ =>
+ false
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
index c795bd9..0ae95db 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
@@ -361,89 +361,86 @@ abstract class ParquetAggregatePushDownSuite
withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true",
vectorizedReaderEnabledKey -> testVectorizedReader) {
- val testMinWithTS = sql("SELECT min(StringCol), min(BooleanCol),
min(ByteCol), " +
+ val testMinWithAllTypes = sql("SELECT min(StringCol),
min(BooleanCol), min(ByteCol), " +
"min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol),
min(FloatCol), " +
"min(DoubleCol), min(DecimalCol), min(DateCol),
min(TimestampCol) FROM test")
// INT96 (Timestamp) sort order is undefined, parquet doesn't
return stats for this type
// so aggregates are not pushed down
- testMinWithTS.queryExecution.optimizedPlan.collect {
+ // In addition, Parquet Binary min/max could be truncated, so we
disable aggregate
+ // push down for Parquet Binary (could be Spark StringType,
BinaryType or DecimalType)
+ testMinWithAllTypes.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
"PushedAggregation: []"
- checkKeywordsExistsInExplain(testMinWithTS,
expected_plan_fragment)
+ checkKeywordsExistsInExplain(testMinWithAllTypes,
expected_plan_fragment)
}
- checkAnswer(testMinWithTS, Seq(Row("a string", false, 1.toByte,
"Parquet".getBytes,
- 2.toShort, 3, -9223372036854775808L, 0.15.toFloat, 0.75D,
1.23457,
- ("2004-06-19").date, ("1999-08-26 10:43:59.123").ts)))
+ checkAnswer(testMinWithAllTypes, Seq(Row("a string", false,
1.toByte,
+ "Parquet".getBytes, 2.toShort, 3, -9223372036854775808L,
0.15.toFloat, 0.75D,
+ 1.23457, ("2004-06-19").date, ("1999-08-26 10:43:59.123").ts)))
- val testMinWithOutTS = sql("SELECT min(StringCol),
min(BooleanCol), min(ByteCol), " +
- "min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol),
min(FloatCol), " +
- "min(DoubleCol), min(DecimalCol), min(DateCol) FROM test")
+ val testMinWithOutTSAndBinary = sql("SELECT min(BooleanCol),
min(ByteCol), " +
+ "min(ShortCol), min(IntegerCol), min(LongCol), min(FloatCol), " +
+ "min(DoubleCol), min(DateCol) FROM test")
- testMinWithOutTS.queryExecution.optimizedPlan.collect {
+ testMinWithOutTSAndBinary.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
- "PushedAggregation: [MIN(StringCol), " +
- "MIN(BooleanCol), " +
+ "PushedAggregation: [MIN(BooleanCol), " +
"MIN(ByteCol), " +
- "MIN(BinaryCol), " +
"MIN(ShortCol), " +
"MIN(IntegerCol), " +
"MIN(LongCol), " +
"MIN(FloatCol), " +
"MIN(DoubleCol), " +
- "MIN(DecimalCol), " +
"MIN(DateCol)]"
- checkKeywordsExistsInExplain(testMinWithOutTS,
expected_plan_fragment)
+ checkKeywordsExistsInExplain(testMinWithOutTSAndBinary,
expected_plan_fragment)
}
- checkAnswer(testMinWithOutTS, Seq(Row("a string", false, 1.toByte,
"Parquet".getBytes,
- 2.toShort, 3, -9223372036854775808L, 0.15.toFloat, 0.75D,
1.23457,
- ("2004-06-19").date)))
+ checkAnswer(testMinWithOutTSAndBinary, Seq(Row(false, 1.toByte,
+ 2.toShort, 3, -9223372036854775808L, 0.15.toFloat, 0.75D,
("2004-06-19").date)))
- val testMaxWithTS = sql("SELECT max(StringCol), max(BooleanCol),
max(ByteCol), " +
- "max(BinaryCol), max(ShortCol), max(IntegerCol), max(LongCol),
max(FloatCol), " +
- "max(DoubleCol), max(DecimalCol), max(DateCol),
max(TimestampCol) FROM test")
+ val testMaxWithAllTypes = sql("SELECT max(StringCol),
max(BooleanCol), " +
+ "max(ByteCol), max(BinaryCol), max(ShortCol), max(IntegerCol),
max(LongCol), " +
+ "max(FloatCol), max(DoubleCol), max(DecimalCol), max(DateCol),
max(TimestampCol) " +
+ "FROM test")
// INT96 (Timestamp) sort order is undefined, parquet doesn't
return stats for this type
// so aggregates are not pushed down
- testMaxWithTS.queryExecution.optimizedPlan.collect {
+ // In addition, Parquet Binary min/max could be truncated, so we
disable aggregate
+ // push down for Parquet Binary (could be Spark StringType,
BinaryType or DecimalType)
+ testMaxWithAllTypes.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
"PushedAggregation: []"
- checkKeywordsExistsInExplain(testMaxWithTS,
expected_plan_fragment)
+ checkKeywordsExistsInExplain(testMaxWithAllTypes,
expected_plan_fragment)
}
- checkAnswer(testMaxWithTS, Seq(Row("test string", true, 16.toByte,
+ checkAnswer(testMaxWithAllTypes, Seq(Row("test string", true,
16.toByte,
"Spark SQL".getBytes, 222.toShort, 113, 9223372036854775807L,
0.25.toFloat, 0.85D,
12345.678, ("2021-01-01").date, ("2021-01-01 23:50:59.123").ts)))
- val testMaxWithoutTS = sql("SELECT max(StringCol),
max(BooleanCol), max(ByteCol), " +
- "max(BinaryCol), max(ShortCol), max(IntegerCol), max(LongCol),
max(FloatCol), " +
- "max(DoubleCol), max(DecimalCol), max(DateCol) FROM test")
+ val testMaxWithoutTSAndBinary = sql("SELECT max(BooleanCol),
max(ByteCol), " +
+ "max(ShortCol), max(IntegerCol), max(LongCol), max(FloatCol), " +
+ "max(DoubleCol), max(DateCol) FROM test")
- testMaxWithoutTS.queryExecution.optimizedPlan.collect {
+ testMaxWithoutTSAndBinary.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
- "PushedAggregation: [MAX(StringCol), " +
- "MAX(BooleanCol), " +
+ "PushedAggregation: [MAX(BooleanCol), " +
"MAX(ByteCol), " +
- "MAX(BinaryCol), " +
"MAX(ShortCol), " +
"MAX(IntegerCol), " +
"MAX(LongCol), " +
"MAX(FloatCol), " +
"MAX(DoubleCol), " +
- "MAX(DecimalCol), " +
"MAX(DateCol)]"
- checkKeywordsExistsInExplain(testMaxWithoutTS,
expected_plan_fragment)
+ checkKeywordsExistsInExplain(testMaxWithoutTSAndBinary,
expected_plan_fragment)
}
- checkAnswer(testMaxWithoutTS, Seq(Row("test string", true,
16.toByte,
- "Spark SQL".getBytes, 222.toShort, 113, 9223372036854775807L,
0.25.toFloat, 0.85D,
- 12345.678, ("2021-01-01").date)))
+ checkAnswer(testMaxWithoutTSAndBinary, Seq(Row(true, 16.toByte,
+ 222.toShort, 113, 9223372036854775807L, 0.25.toFloat, 0.85D,
("2021-01-01").date)))
val testCount = sql("SELECT count(StringCol), count(BooleanCol)," +
" count(ByteCol), count(BinaryCol), count(ShortCol),
count(IntegerCol)," +
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]