This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new e8dd144abca [SPARK-39634][SQL] Allow file splitting in combination 
with row index generation
e8dd144abca is described below

commit e8dd144abcab58870aa730517b6cea5121b5868e
Author: Venki Korukanti <[email protected]>
AuthorDate: Fri Jul 21 13:16:23 2023 +0800

    [SPARK-39634][SQL] Allow file splitting in combination with row index 
generation
    
    ### What changes were proposed in this pull request?
    - Parquet version `1.13.1` has a fix for 
[PARQUET-2161](https://issues.apache.org/jira/browse/PARQUET-2161) which allows 
splitting the parquet files when row index metadata column is selected. 
Currently the file splitting is disabled. Enable file splitting with row index 
column.
    
    ### Why are the changes needed?
    Splitting parquet files allows better parallelization when row index 
metadata column is selected.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Uncomment the existing unittests.
    
    Closes #40728 from vkorukanti/SPARK-39634.
    
    Authored-by: Venki Korukanti <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 679ea56dc8be7566c32a606639aa052421136afc)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../scala/org/apache/spark/sql/execution/DataSourceScanExec.scala | 8 ++------
 .../sql/execution/datasources/parquet/ParquetRowIndexUtil.scala   | 4 ----
 .../spark/sql/execution/datasources/v2/parquet/ParquetScan.scala  | 7 ++-----
 .../sql/execution/datasources/parquet/ParquetRowIndexSuite.scala  | 5 +----
 4 files changed, 5 insertions(+), 19 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 243aaabc0cb..6375cdacaa0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -31,7 +31,7 @@ import 
org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
 import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat 
=> ParquetSource, ParquetRowIndexUtil}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat 
=> ParquetSource}
 import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
@@ -691,11 +691,7 @@ case class FileSourceScanExec(
       partition.files.flatMap { file =>
         if (shouldProcess(file.getPath)) {
           val isSplitable = relation.fileFormat.isSplitable(
-              relation.sparkSession, relation.options, file.getPath) &&
-            // SPARK-39634: Allow file splitting in combination with row index 
generation once
-            // the fix for PARQUET-2161 is available.
-            (!relation.fileFormat.isInstanceOf[ParquetSource]
-              || !ParquetRowIndexUtil.isNeededForSchema(requiredSchema))
+              relation.sparkSession, relation.options, file.getPath)
           PartitionedFileUtil.splitFiles(
             sparkSession = relation.sparkSession,
             file = file,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala
index b1cd2ebee42..a5d8494cfa7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala
@@ -126,10 +126,6 @@ object ParquetRowIndexUtil {
     }
   }
 
-  def isNeededForSchema(sparkSchema: StructType): Boolean = {
-    findRowIndexColumnIndexInSchema(sparkSchema) >= 0
-  }
-
   def isRowIndexColumn(column: ParquetColumn): Boolean = {
     column.path.length == 1 && column.path.last == 
ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
index c7819d39cbc..0e77b419ff5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
 import org.apache.spark.sql.connector.read.PartitionReaderFactory
 import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, 
PartitioningAwareFileIndex}
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetReadSupport, ParquetRowIndexUtil, ParquetWriteSupport}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetReadSupport, ParquetWriteSupport}
 import org.apache.spark.sql.execution.datasources.v2.FileScan
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
@@ -50,10 +50,7 @@ case class ParquetScan(
   override def isSplitable(path: Path): Boolean = {
     // If aggregate is pushed down, only the file footer will be read once,
     // so file should not be split across multiple tasks.
-    pushedAggregate.isEmpty &&
-      // SPARK-39634: Allow file splitting in combination with row index 
generation once
-      // the fix for PARQUET-2161 is available.
-      !ParquetRowIndexUtil.isNeededForSchema(readSchema)
+    pushedAggregate.isEmpty
   }
 
   override def readSchema(): StructType = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
index dd350ffd315..27c2a2148fd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
@@ -249,10 +249,7 @@ class ParquetRowIndexSuite extends QueryTest with 
SharedSparkSession {
           assert(numOutputRows > 0)
 
           if (conf.useSmallSplits) {
-            // SPARK-39634: Until the fix the fix for PARQUET-2161 is 
available is available,
-            // it is not possible to split Parquet files into multiple 
partitions while generating
-            // row indexes.
-            // assert(numPartitions >= 2 * conf.numFiles)
+            assert(numPartitions >= 2 * conf.numFiles)
           }
 
           // Assert that every rowIdx value matches the value in 
`expectedRowIdx`.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to