This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new e8dd144abca [SPARK-39634][SQL] Allow file splitting in combination
with row index generation
e8dd144abca is described below
commit e8dd144abcab58870aa730517b6cea5121b5868e
Author: Venki Korukanti <[email protected]>
AuthorDate: Fri Jul 21 13:16:23 2023 +0800
[SPARK-39634][SQL] Allow file splitting in combination with row index
generation
### What changes were proposed in this pull request?
- Parquet version `1.13.1` has a fix for
[PARQUET-2161](https://issues.apache.org/jira/browse/PARQUET-2161) which allows
splitting the parquet files when row index metadata column is selected.
Currently the file splitting is disabled. Enable file splitting with row index
column.
### Why are the changes needed?
Splitting parquet files allows better parallelization when row index
metadata column is selected.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Uncomment the existing unittests.
Closes #40728 from vkorukanti/SPARK-39634.
Authored-by: Venki Korukanti <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 679ea56dc8be7566c32a606639aa052421136afc)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../scala/org/apache/spark/sql/execution/DataSourceScanExec.scala | 8 ++------
.../sql/execution/datasources/parquet/ParquetRowIndexUtil.scala | 4 ----
.../spark/sql/execution/datasources/v2/parquet/ParquetScan.scala | 7 ++-----
.../sql/execution/datasources/parquet/ParquetRowIndexSuite.scala | 5 +----
4 files changed, 5 insertions(+), 19 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 243aaabc0cb..6375cdacaa0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -31,7 +31,7 @@ import
org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat
=> ParquetSource, ParquetRowIndexUtil}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat
=> ParquetSource}
import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
@@ -691,11 +691,7 @@ case class FileSourceScanExec(
partition.files.flatMap { file =>
if (shouldProcess(file.getPath)) {
val isSplitable = relation.fileFormat.isSplitable(
- relation.sparkSession, relation.options, file.getPath) &&
- // SPARK-39634: Allow file splitting in combination with row index
generation once
- // the fix for PARQUET-2161 is available.
- (!relation.fileFormat.isInstanceOf[ParquetSource]
- || !ParquetRowIndexUtil.isNeededForSchema(requiredSchema))
+ relation.sparkSession, relation.options, file.getPath)
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala
index b1cd2ebee42..a5d8494cfa7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala
@@ -126,10 +126,6 @@ object ParquetRowIndexUtil {
}
}
- def isNeededForSchema(sparkSchema: StructType): Boolean = {
- findRowIndexColumnIndexInSchema(sparkSchema) >= 0
- }
-
def isRowIndexColumn(column: ParquetColumn): Boolean = {
column.path.length == 1 && column.path.last ==
ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
index c7819d39cbc..0e77b419ff5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils,
PartitioningAwareFileIndex}
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions,
ParquetReadSupport, ParquetRowIndexUtil, ParquetWriteSupport}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions,
ParquetReadSupport, ParquetWriteSupport}
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
@@ -50,10 +50,7 @@ case class ParquetScan(
override def isSplitable(path: Path): Boolean = {
// If aggregate is pushed down, only the file footer will be read once,
// so file should not be split across multiple tasks.
- pushedAggregate.isEmpty &&
- // SPARK-39634: Allow file splitting in combination with row index
generation once
- // the fix for PARQUET-2161 is available.
- !ParquetRowIndexUtil.isNeededForSchema(readSchema)
+ pushedAggregate.isEmpty
}
override def readSchema(): StructType = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
index dd350ffd315..27c2a2148fd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
@@ -249,10 +249,7 @@ class ParquetRowIndexSuite extends QueryTest with
SharedSparkSession {
assert(numOutputRows > 0)
if (conf.useSmallSplits) {
- // SPARK-39634: Until the fix the fix for PARQUET-2161 is
available is available,
- // it is not possible to split Parquet files into multiple
partitions while generating
- // row indexes.
- // assert(numPartitions >= 2 * conf.numFiles)
+ assert(numPartitions >= 2 * conf.numFiles)
}
// Assert that every rowIdx value matches the value in
`expectedRowIdx`.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]