This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 95aebcbf100 [SPARK-37980][SQL] Extend METADATA column to support row 
indexes for Parquet files
95aebcbf100 is described below

commit 95aebcbf100de1dbedd32626ce67bd01014c973e
Author: Ala Luszczak <a...@databricks.com>
AuthorDate: Tue Aug 16 21:03:42 2022 +0800

    [SPARK-37980][SQL] Extend METADATA column to support row indexes for 
Parquet files
    
    ### What changes were proposed in this pull request?
    
    This change adds `row_index` column to `_metadata` struct. This column 
allows us to uniquely identify rows read from a given file with an index 
number. The n-th row  in a given file with be assigned `n-1` row index in every 
scan of the file, irrespective of file splitting and data skipping in use.
    
    The new column requires file format specific support. This change 
introduces Parquet support, and other formats can follow later.
    
    ### Why are the changes needed?
    
    Row Indexes can be used in a variety of ways. A (fileName, rowIndex) tuple 
uniquely identifies a row in a table. This information can be used to mark rows 
e.g. can be used to create an indexer.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. With this change the customers will be able to access 
`_metadata.row_index` metadata column when reading Parquet data. The schema of 
`_matadata` column remains unchanged for file formats without row index support.
    
    ### How was this patch tested?
    
    * Added `FileMetadataStructSuite.scala` to make sure the feature works 
correctly in different scenarios (supported/unsupported file format, 
batch/record reads, on/off heap memory...).
    * Added `ParquetRowIndexSuite.scala` to make sure the row indexes are 
generated correctly for Parquet file in conjunction with any combination of 
data skipping features.
    * Extended `FileMetadataStructRowIndexSuite` to account for new column in 
`_metadata` struct.
    
    Closes #37228 from ala/row-idx-v4.
    
    Lead-authored-by: Ala Luszczak <a...@databricks.com>
    Co-authored-by: IonutBoicu <ionut.bo...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 ...ParquetNestedSchemaPruningBenchmark-results.txt |  60 ++--
 .../datasources/parquet/ParquetColumnVector.java   |   5 +
 .../parquet/VectorizedParquetRecordReader.java     |  14 +
 .../spark/sql/execution/DataSourceScanExec.scala   |  20 +-
 .../sql/execution/datasources/DataSource.scala     |   2 +-
 .../sql/execution/datasources/FileFormat.scala     |  46 ++-
 .../sql/execution/datasources/FileScanRDD.scala    |   9 +-
 .../execution/datasources/FileSourceStrategy.scala |  62 +++-
 .../execution/datasources/LogicalRelation.scala    |   4 +-
 .../sql/execution/datasources/RowIndexUtil.scala   |  40 +++
 .../datasources/parquet/ParquetFileFormat.scala    |   6 +-
 .../datasources/parquet/ParquetRowIndexUtil.scala  | 120 ++++++++
 .../v2/parquet/ParquetPartitionReaderFactory.scala |   6 +-
 .../datasources/v2/parquet/ParquetScan.scala       |   7 +-
 .../execution/streaming/StreamingRelation.scala    |   3 +-
 .../benchmark/MetadataStructBenchmark.scala        |  85 ++++++
 .../FileMetadataStructRowIndexSuite.scala          | 235 ++++++++++++++++
 .../datasources/FileMetadataStructSuite.scala      |  66 ++---
 .../datasources/parquet/ParquetRowIndexSuite.scala | 313 +++++++++++++++++++++
 19 files changed, 1000 insertions(+), 103 deletions(-)

diff --git 
a/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt 
b/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
index ec149dce313..ca035973065 100644
--- a/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
+++ b/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
@@ -2,52 +2,52 @@
 Nested Schema Pruning Benchmark For Parquet
 
================================================================================================
 
-OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic
+Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
 Selection:                                Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    104            114         
 14          9.6         104.0       1.0X
-Nested column                                       111            121         
 20          9.0         110.5       0.9X
-Nested column in array                              321            328         
  8          3.1         320.5       0.3X
+Top-level column                                     91            131         
 30         10.9          91.4       1.0X
+Nested column                                        73            109         
 32         13.6          73.5       1.2X
+Nested column in array                              248            266         
 28          4.0         248.1       0.4X
 
-OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic
+Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
 Limiting:                                 Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    373            384         
  8          2.7         373.4       1.0X
-Nested column                                       386            400         
 10          2.6         385.8       1.0X
-Nested column in array                              831            859         
 22          1.2         831.2       0.4X
+Top-level column                                    268            386         
154          3.7         268.4       1.0X
+Nested column                                       271            305         
 22          3.7         271.3       1.0X
+Nested column in array                              648            737         
 49          1.5         648.0       0.4X
 
-OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic
+Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
 Repartitioning:                           Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    353            366         
  9          2.8         353.1       1.0X
-Nested column                                       366            373         
  6          2.7         366.3       1.0X
-Nested column in array                              772            793         
 14          1.3         771.8       0.5X
+Top-level column                                    227            259         
 24          4.4         226.6       1.0X
+Nested column                                       271            429         
182          3.7         271.2       0.8X
+Nested column in array                              644            751         
114          1.6         644.1       0.4X
 
-OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic
+Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
 Repartitioning by exprs:                  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    333            339         
  5          3.0         333.4       1.0X
-Nested column                                       370            388         
 12          2.7         369.7       0.9X
-Nested column in array                              784            830         
 21          1.3         784.5       0.4X
+Top-level column                                    263            306         
 29          3.8         262.8       1.0X
+Nested column                                       323            385         
 48          3.1         322.5       0.8X
+Nested column in array                              637            789         
195          1.6         637.1       0.4X
 
-OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic
+Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
 Sample:                                   Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    123            149         
 30          8.1         123.1       1.0X
-Nested column                                       133            157         
 37          7.5         132.8       0.9X
-Nested column in array                              382            420         
 16          2.6         381.6       0.3X
+Top-level column                                     84            129         
 43         12.0          83.5       1.0X
+Nested column                                        83             99         
 13         12.1          82.7       1.0X
+Nested column in array                              312            366         
 47          3.2         312.2       0.3X
 
-OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic
+Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
 Sorting:                                  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    567            579         
 10          1.8         566.6       1.0X
-Nested column                                       637            652         
 15          1.6         637.1       0.9X
-Nested column in array                             1171           1212         
 28          0.9        1171.4       0.5X
+Top-level column                                    342            420         
 54          2.9         342.2       1.0X
+Nested column                                       423            441         
 17          2.4         422.6       0.8X
+Nested column in array                              821            870         
 28          1.2         821.1       0.4X
 
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
index 47774e0a397..5272151acf2 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
@@ -75,6 +75,11 @@ final class ParquetColumnVector {
     this.isPrimitive = column.isPrimitive();
 
     if (missingColumns.contains(column)) {
+      if (ParquetRowIndexUtil.isRowIndexColumn(column)) {
+        // The values of row index column are going to be generated by the 
reader instead.
+        return;
+      }
+
       if (defaultValue == null) {
         vector.setAllNull();
         return;
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 63fdec4056f..97f739c5bf2 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -132,6 +132,11 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
    */
   private boolean returnColumnarBatch;
 
+  /**
+   * Populates the row index column if needed.
+   */
+  private ParquetRowIndexUtil.RowIndexGenerator rowIndexGenerator = null;
+
   /**
    * The memory mode of the columnarBatch
    */
@@ -275,6 +280,8 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
           (ConstantColumnVector) vectors[i + partitionIdx], partitionValues, 
i);
       }
     }
+
+    rowIndexGenerator = 
ParquetRowIndexUtil.createGeneratorIfNeeded(sparkSchema);
   }
 
   private void initBatch() {
@@ -324,6 +331,10 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
       }
       cv.assemble();
     }
+    // If needed, compute row indexes within a file.
+    if (rowIndexGenerator != null) {
+      rowIndexGenerator.populateRowIndex(columnVectors, num);
+    }
 
     rowsReturned += num;
     columnarBatch.setNumRows(num);
@@ -395,6 +406,9 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
       throw new IOException("expecting more rows but reached last block. Read "
           + rowsReturned + " out of " + totalRowCount);
     }
+    if (rowIndexGenerator != null) {
+      rowIndexGenerator.initFromPageReadStore(pages);
+    }
     for (ParquetColumnVector cv : columnVectors) {
       initColumnReader(pages, cv);
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 5950136e79a..fdb49bd7674 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat 
=> ParquetSource}
 import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
-import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
+import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, 
OffHeapColumnVector, OnHeapColumnVector}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.StructType
@@ -214,8 +214,17 @@ trait FileSourceScanLike extends DataSourceScanExec {
       requiredSchema = requiredSchema,
       partitionSchema = relation.partitionSchema,
       relation.sparkSession.sessionState.conf).map { vectorTypes =>
-        // for column-based file format, append metadata column's vector type 
classes if any
-        vectorTypes ++ 
Seq.fill(metadataColumns.size)(classOf[ConstantColumnVector].getName)
+        vectorTypes ++
+          // for column-based file format, append metadata column's vector 
type classes if any
+          metadataColumns.map { metadataCol =>
+            if (FileFormat.isConstantMetadataAttr(metadataCol.name)) {
+              classOf[ConstantColumnVector].getName
+            } else if 
(relation.sparkSession.sessionState.conf.offHeapColumnVectorEnabled) {
+              classOf[OffHeapColumnVector].getName
+            } else {
+              classOf[OnHeapColumnVector].getName
+            }
+          }
       }
 
   lazy val driverMetrics = Map(
@@ -690,7 +699,10 @@ case class FileSourceScanExec(
 
         if (shouldProcess(filePath)) {
           val isSplitable = relation.fileFormat.isSplitable(
-            relation.sparkSession, relation.options, filePath)
+              relation.sparkSession, relation.options, filePath) &&
+            // SPARK-39634: Allow file splitting in combination with row index 
generation once
+            // the fix for PARQUET-2161 is available.
+            !RowIndexUtil.isNeededForSchema(requiredSchema)
           PartitionedFileUtil.splitFiles(
             sparkSession = relation.sparkSession,
             file = file,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 8f8846b89f3..d50fd88f65c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -111,7 +111,7 @@ case class DataSource(
     }
   }
 
-  private def providingInstance() = 
providingClass.getConstructor().newInstance()
+  private[sql] def providingInstance(): Any = 
providingClass.getConstructor().newInstance()
 
   private def newHadoopConfiguration(): Configuration =
     sparkSession.sessionState.newHadoopConfWithOptions(options)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index f9b37fb5d9f..f7f917d8947 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.{DataType, LongType, StringType, 
StructField, StructType, TimestampType}
@@ -182,18 +183,32 @@ object FileFormat {
 
   val FILE_MODIFICATION_TIME = "file_modification_time"
 
+  val ROW_INDEX = "row_index"
+
+  // A name for a temporary column that holds row indexes computed by the file 
format reader
+  // until they can be placed in the _metadata struct.
+  val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
+
   val METADATA_NAME = "_metadata"
 
-  // supported metadata struct fields for hadoop fs relation
-  val METADATA_STRUCT: StructType = new StructType()
-    .add(StructField(FILE_PATH, StringType))
-    .add(StructField(FILE_NAME, StringType))
-    .add(StructField(FILE_SIZE, LongType))
-    .add(StructField(FILE_MODIFICATION_TIME, TimestampType))
+  /** Schema of metadata struct that can be produced by every file format. */
+  val BASE_METADATA_STRUCT: StructType = new StructType()
+    .add(StructField(FileFormat.FILE_PATH, StringType))
+    .add(StructField(FileFormat.FILE_NAME, StringType))
+    .add(StructField(FileFormat.FILE_SIZE, LongType))
+    .add(StructField(FileFormat.FILE_MODIFICATION_TIME, TimestampType))
 
-  // create a file metadata struct col
-  def createFileMetadataCol: AttributeReference =
-    FileSourceMetadataAttribute(METADATA_NAME, METADATA_STRUCT)
+  /**
+   * Create a file metadata struct column containing fields supported by the 
given file format.
+   */
+  def createFileMetadataCol(fileFormat: FileFormat): AttributeReference = {
+    val struct = if (fileFormat.isInstanceOf[ParquetFileFormat]) {
+      BASE_METADATA_STRUCT.add(StructField(FileFormat.ROW_INDEX, LongType))
+    } else {
+      BASE_METADATA_STRUCT
+    }
+    FileSourceMetadataAttribute(FileFormat.METADATA_NAME, struct)
+  }
 
   // create an internal row given required metadata fields and file information
   def createMetadataInternalRow(
@@ -220,10 +235,23 @@ object FileFormat {
           // the modificationTime from the file is in millisecond,
           // while internally, the TimestampType `file_modification_time` is 
stored in microsecond
           row.update(i, fileModificationTime * 1000L)
+        case ROW_INDEX =>
+          // Do nothing. Only the metadata fields that have identical values 
for each row of the
+          // file are set by this function, while fields that have different 
values (such as row
+          // index) are set separately.
       }
     }
     row
   }
+
+  /**
+   * Returns true if the given metadata column always contains identical 
values for all rows
+   * originating from the same data file.
+   */
+  def isConstantMetadataAttr(name: String): Boolean = name match {
+    case FILE_PATH | FILE_NAME | FILE_SIZE | FILE_MODIFICATION_TIME => true
+    case ROW_INDEX => false
+  }
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 4c3f5629e78..827d41dd096 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.FileFormat._
 import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
 import org.apache.spark.sql.types.{LongType, StringType, StructType}
-import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.NextIterator
 
@@ -133,8 +133,9 @@ class FileScanRDD(
       }
 
       /**
-       * For each partitioned file, metadata columns for each record in the 
file are exactly same.
-       * Only update metadata row when `currentFile` is changed.
+       * The value of some of the metadata columns remains exactly the same 
for each record of
+       * a partitioned file. Only need to update their values in the metadata 
row when `currentFile`
+       * is changed.
        */
       private def updateMetadataRow(): Unit =
         if (metadataColumns.nonEmpty && currentFile != null) {
@@ -145,7 +146,7 @@ class FileScanRDD(
       /**
        * Create an array of constant column vectors containing all required 
metadata columns
        */
-      private def createMetadataColumnVector(c: ColumnarBatch): 
Array[ConstantColumnVector] = {
+      private def createMetadataColumnVector(c: ColumnarBatch): 
Array[ColumnVector] = {
         val path = new Path(currentFile.filePath)
         metadataColumns.map(_.name).map {
           case FILE_PATH =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 4995a0d6cd4..22ad7960cdd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.util.Locale
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -26,7 +28,7 @@ import 
org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME
-import org.apache.spark.sql.types.{DoubleType, FloatType, StructType}
+import org.apache.spark.sql.types.{DoubleType, FloatType, LongType, StructType}
 import org.apache.spark.util.collection.BitSet
 
 /**
@@ -206,13 +208,6 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
       val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq 
++ projects
       val requiredAttributes = AttributeSet(requiredExpressions)
 
-      val readDataColumns =
-        dataColumns
-          .filter(requiredAttributes.contains)
-          .filterNot(partitionColumns.contains)
-      val outputSchema = readDataColumns.toStructType
-      logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
-
       val metadataStructOpt = l.output.collectFirst {
         case FileSourceMetadataAttribute(attr) => attr
       }
@@ -223,14 +218,45 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
         }.toSeq
       }.getOrElse(Seq.empty)
 
-      // outputAttributes should also include the metadata columns at the very 
end
-      val outputAttributes = readDataColumns ++ partitionColumns ++ 
metadataColumns
+      val fileConstantMetadataColumns: Seq[Attribute] =
+        metadataColumns.filter(_.name != FileFormat.ROW_INDEX)
+
+      val readDataColumns = dataColumns
+          .filter(requiredAttributes.contains)
+          .filterNot(partitionColumns.contains)
+
+      val fileFormatReaderGeneratedMetadataColumns: Seq[Attribute] =
+        metadataColumns.map(_.name).flatMap {
+          case FileFormat.ROW_INDEX =>
+            if ((readDataColumns ++ 
partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
+                .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
+              throw new 
AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
+                " is a reserved column name that cannot be read in combination 
with " +
+                s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} column.")
+            }
+            
Some(AttributeReference(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType)())
+          case _ => None
+        }
+
+      val outputDataSchema = (readDataColumns ++ 
fileFormatReaderGeneratedMetadataColumns)
+        .toStructType
+
+      // The output rows will be produced during file scan operation in three 
steps:
+      //  (1) File format reader populates a `Row` with `readDataColumns` and
+      //      `fileFormatReaderGeneratedMetadataColumns`
+      //  (2) Then, a row containing `partitionColumns` is joined at the end.
+      //  (3) Finally, a row containing `fileConstantMetadataColumns` is also 
joined at the end.
+      // By placing `fileFormatReaderGeneratedMetadataColumns` before 
`partitionColumns` and
+      // `fileConstantMetadataColumns` in the `outputAttributes` we make these 
row operations
+      // simpler and more efficient.
+      val outputAttributes = readDataColumns ++ 
fileFormatReaderGeneratedMetadataColumns ++
+        partitionColumns ++ fileConstantMetadataColumns
 
       val scan =
         FileSourceScanExec(
           fsRelation,
           outputAttributes,
-          outputSchema,
+          outputDataSchema,
           partitionKeyFilters.toSeq,
           bucketSet,
           None,
@@ -239,10 +265,20 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
 
       // extra Project node: wrap flat metadata columns to a metadata struct
       val withMetadataProjections = metadataStructOpt.map { metadataStruct =>
+        val structColumns = metadataColumns.map { col => col.name match {
+            case FileFormat.FILE_PATH | FileFormat.FILE_NAME | 
FileFormat.FILE_SIZE |
+                 FileFormat.FILE_MODIFICATION_TIME =>
+              col
+            case FileFormat.ROW_INDEX =>
+              fileFormatReaderGeneratedMetadataColumns
+                .find(_.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
+                .get.withName(FileFormat.ROW_INDEX)
+          }
+        }
         val metadataAlias =
-          Alias(CreateStruct(metadataColumns), METADATA_NAME)(exprId = 
metadataStruct.exprId)
+          Alias(CreateStruct(structColumns), METADATA_NAME)(exprId = 
metadataStruct.exprId)
         execution.ProjectExec(
-          scan.output.dropRight(metadataColumns.length) :+ metadataAlias, scan)
+          readDataColumns ++ partitionColumns :+ metadataAlias, scan)
       }.getOrElse(scan)
 
       val afterScanFilter = 
afterScanFilters.toSeq.reduceOption(expressions.And)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 291b98fb37c..43699c1b6b1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -69,7 +69,7 @@ case class LogicalRelation(
   }
 
   override lazy val metadataOutput: Seq[AttributeReference] = relation match {
-    case _: HadoopFsRelation =>
+    case relation: HadoopFsRelation =>
       val resolve = conf.resolver
       val outputNames = outputSet.map(_.name)
       def isOutputColumn(col: AttributeReference): Boolean = {
@@ -78,7 +78,7 @@ case class LogicalRelation(
       // filter out the metadata struct column if it has the name conflicting 
with output columns.
       // if the file has a column "_metadata",
       // then the data column should be returned not the metadata struct column
-      Seq(FileFormat.createFileMetadataCol).filterNot(isOutputColumn)
+      
Seq(FileFormat.createFileMetadataCol(relation.fileFormat)).filterNot(isOutputColumn)
     case _ => Nil
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RowIndexUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RowIndexUtil.scala
new file mode 100644
index 00000000000..1512b6da1e8
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RowIndexUtil.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+
+object RowIndexUtil {
+  def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = {
+    sparkSchema.fields.zipWithIndex.find { case (field: StructField, _: Int) =>
+      field.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+    } match {
+      case Some((field: StructField, idx: Int)) =>
+        if (field.dataType != LongType) {
+          throw new 
RuntimeException(s"${FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} must be of " +
+            "LongType")
+        }
+        idx
+      case _ => -1
+    }
+  }
+
+  def isNeededForSchema(sparkSchema: StructType): Boolean = {
+    findRowIndexColumnIndexInSchema(sparkSchema) >= 0
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 513379d23d6..c20063333c5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -366,9 +366,11 @@ class ParquetFileFormat
         } else {
           new ParquetRecordReader[InternalRow](readSupport)
         }
-        val iter = new RecordReaderIterator[InternalRow](reader)
+        val readerWithRowIndexes = 
ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader,
+            requiredSchema)
+        val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes)
         try {
-          reader.initialize(split, hadoopAttemptContext)
+          readerWithRowIndexes.initialize(split, hadoopAttemptContext)
 
           val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
           val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala
new file mode 100644
index 00000000000..fb1f6a417f4
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.parquet.column.page.PageReadStore
+import org.apache.parquet.hadoop.ParquetRecordReader
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, RowIndexUtil}
+import 
org.apache.spark.sql.execution.datasources.RowIndexUtil.findRowIndexColumnIndexInSchema
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector
+import org.apache.spark.sql.types.StructType
+
+
+object ParquetRowIndexUtil {
+  /**
+   * Generate row indexes for vectorized readers.
+  */
+  class RowIndexGenerator(rowIndexColumnIdx: Int) {
+    var rowIndexIterator: Iterator[Long] = _
+
+    /** For Parquet only: initialize the generator using provided 
PageReadStore. */
+    def initFromPageReadStore(pages: PageReadStore): Unit = {
+      if (!pages.getRowIndexOffset.isPresent) {
+        throw new IOException("PageReadStore returned no row index offset.")
+      }
+      val startingRowIdx: Long = pages.getRowIndexOffset.get()
+      if (pages.getRowIndexes.isPresent) {
+        // The presence of `getRowIndexes` indicates that page skipping is 
effective and only
+        // a subset of rows in the row group is going to be read. Note that 
there is a name
+        // collision here: these row indexes (unlike ones this class is 
generating) are counted
+        // starting from 0 in each of the row groups.
+        rowIndexIterator = pages.getRowIndexes.get.asScala.map(idx => idx + 
startingRowIdx)
+      } else {
+        val numRowsInRowGroup = pages.getRowCount
+        rowIndexIterator = (startingRowIdx until startingRowIdx + 
numRowsInRowGroup).iterator
+      }
+    }
+
+    def populateRowIndex(columnVectors: Array[ParquetColumnVector], numRows: 
Int): Unit = {
+      populateRowIndex(columnVectors(rowIndexColumnIdx).getValueVector, 
numRows)
+    }
+
+    def populateRowIndex(columnVector: WritableColumnVector, numRows: Int): 
Unit = {
+      for (i <- 0 until numRows) {
+        columnVector.putLong(i, rowIndexIterator.next())
+      }
+    }
+  }
+
+  def createGeneratorIfNeeded(sparkSchema: StructType): RowIndexGenerator = {
+    val columnIdx = findRowIndexColumnIndexInSchema(sparkSchema)
+    if (columnIdx >= 0) new RowIndexGenerator(columnIdx)
+    else null
+  }
+
+  /**
+   * A wrapper for `ParquetRecordReader` that sets row index column to the 
correct value in
+   * the returned InternalRow. Used in combination with non-vectorized 
(parquet-mr) Parquet reader.
+   */
+  private class RecordReaderWithRowIndexes(
+      parent: ParquetRecordReader[InternalRow],
+      rowIndexColumnIdx: Int)
+    extends RecordReader[Void, InternalRow] {
+
+    override def initialize(
+        inputSplit: InputSplit,
+        taskAttemptContext: TaskAttemptContext): Unit = {
+      parent.initialize(inputSplit, taskAttemptContext)
+    }
+
+    override def nextKeyValue(): Boolean = parent.nextKeyValue()
+
+    override def getCurrentKey: Void = parent.getCurrentKey
+
+    override def getCurrentValue: InternalRow = {
+      val row = parent.getCurrentValue
+      row.setLong(rowIndexColumnIdx, parent.getCurrentRowIndex)
+      row
+    }
+
+    override def getProgress: Float = parent.getProgress
+
+    override def close(): Unit = parent.close()
+  }
+
+  def addRowIndexToRecordReaderIfNeeded(
+      reader: ParquetRecordReader[InternalRow],
+      sparkSchema: StructType): RecordReader[Void, InternalRow] = {
+    val rowIndexColumnIdx = 
RowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema)
+    if (rowIndexColumnIdx >= 0) {
+      new RecordReaderWithRowIndexes(reader, rowIndexColumnIdx)
+    } else {
+      reader
+    }
+  }
+
+  def isRowIndexColumn(column: ParquetColumn): Boolean = {
+    column.path.length == 1 && column.path.last == 
FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 0f6e5201df8..121ebe1cfa2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -295,10 +295,12 @@ case class ParquetPartitionReaderFactory(
     } else {
       new ParquetRecordReader[InternalRow](readSupport)
     }
-    val iter = new RecordReaderIterator(reader)
+    val readerWithRowIndexes = 
ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(
+      reader, readDataSchema)
+    val iter = new RecordReaderIterator(readerWithRowIndexes)
     // SPARK-23457 Register a task completion listener before `initialization`.
     taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
-    reader
+    readerWithRowIndexes
   }
 
   private def createVectorizedReader(file: PartitionedFile): 
VectorizedParquetRecordReader = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
index 0457e8be715..ff0b38880fd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
 import org.apache.spark.sql.connector.read.PartitionReaderFactory
-import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, 
PartitioningAwareFileIndex}
+import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, 
PartitioningAwareFileIndex, RowIndexUtil}
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetReadSupport, ParquetWriteSupport}
 import org.apache.spark.sql.execution.datasources.v2.FileScan
 import org.apache.spark.sql.internal.SQLConf
@@ -50,7 +50,10 @@ case class ParquetScan(
   override def isSplitable(path: Path): Boolean = {
     // If aggregate is pushed down, only the file footer will be read once,
     // so file should not be split across multiple tasks.
-    pushedAggregate.isEmpty
+    pushedAggregate.isEmpty &&
+      // SPARK-39634: Allow file splitting in combination with row index 
generation once
+      // the fix for PARQUET-2161 is available.
+      !RowIndexUtil.isNeededForSchema(readSchema)
   }
 
   override def readSchema(): StructType = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 7b177f3b67d..af90b692a70 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -70,7 +70,8 @@ case class StreamingRelation(dataSource: DataSource, 
sourceName: String, output:
         // filter out the metadata struct column if it has the name 
conflicting with output columns.
         // if the file has a column "_metadata",
         // then the data column should be returned not the metadata struct 
column
-        Seq(FileFormat.createFileMetadataCol).filterNot(isOutputColumn)
+        Seq(FileFormat.createFileMetadataCol(
+          
dataSource.providingInstance().asInstanceOf[FileFormat])).filterNot(isOutputColumn)
       case _ => Nil
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MetadataStructBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MetadataStructBenchmark.scala
new file mode 100644
index 00000000000..38fff24abe5
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MetadataStructBenchmark.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+
+
+object MetadataStructBenchmark extends SqlBasedBenchmark {
+  import spark.implicits._
+
+  private val NUM_ROWS = 5000000
+  private val NUM_ITERS = 32
+
+  private def withTempData(format: String = "parquet")(f: DataFrame => Unit): 
Unit = {
+    val dir = Utils.createTempDir()
+    dir.delete()
+    try {
+      spark.range(0, NUM_ROWS, 1, 1).toDF("id")
+        .withColumn("num1", $"id" + 10)
+        .withColumn("num2", $"id" / 10)
+        .withColumn("str", lit("a sample string ") + $"id".cast("string"))
+        .write.format(format).save(dir.getAbsolutePath)
+      val df = spark.read.format(format).load(dir.getAbsolutePath)
+      f(df)
+    } finally {
+      Utils.deleteRecursively(dir)
+    }
+  }
+
+  private def addCase(benchmark: Benchmark, df: DataFrame, metadataCol: 
Option[String]): Unit = {
+    benchmark.addCase(metadataCol.getOrElse("no metadata columns")) { _ =>
+      df.select("*", metadataCol.toSeq: _*).noop()
+    }
+  }
+
+  private def metadataBenchmark(name: String, format: String): Unit = {
+    withTempData(format) { df =>
+      val metadataCols = df.select(FileFormat.METADATA_NAME).schema
+        .fields.head.dataType.asInstanceOf[StructType].fieldNames
+
+      val benchmark = new Benchmark(name, NUM_ROWS, NUM_ITERS, output = output)
+
+      addCase(benchmark, df, None)
+      for (metadataCol <- metadataCols) {
+        addCase(benchmark, df, 
Some(s"${FileFormat.METADATA_NAME}.$metadataCol"))
+      }
+      addCase(benchmark, df, Some(FileFormat.METADATA_NAME))
+
+      benchmark.run()
+    }
+  }
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    runBenchmark("Metadata Struct Benchmark") {
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+        metadataBenchmark("Vectorized Parquet", "parquet")
+      }
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+        metadataBenchmark("Parquet-mr", "parquet")
+      }
+      metadataBenchmark("JSON", "json")
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructRowIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructRowIndexSuite.scala
new file mode 100644
index 00000000000..af2d56159bf
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructRowIndexSuite.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
+import org.apache.spark.sql.functions.{col, lit}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+class FileMetadataStructRowIndexSuite extends QueryTest with 
SharedSparkSession {
+  import testImplicits._
+
+  val EXPECTED_ROW_ID_COL = "expected_row_idx"
+  val EXPECTED_EXTRA_COL = "expected_extra_col"
+  val EXPECTED_PARTITION_COL = "experted_pb_col"
+  val NUM_ROWS = 100
+
+  def withReadDataFrame(
+         format: String,
+         partitionCol: String = null,
+         extraCol: String = "ec",
+         extraSchemaFields: Seq[StructField] = Seq.empty)
+      (f: DataFrame => Unit): Unit = {
+    withTempPath { path =>
+      val baseDf = spark.range(0, NUM_ROWS, 1, 1).toDF("id")
+        .withColumn(extraCol, $"id" + lit(1000 * 1000))
+        .withColumn(EXPECTED_EXTRA_COL, col(extraCol))
+      val writeSchema: StructType = if (partitionCol != null) {
+        val writeDf = baseDf
+          .withColumn(partitionCol, ($"id" / 10).cast("int") + lit(1000))
+          .withColumn(EXPECTED_PARTITION_COL, col(partitionCol))
+          .withColumn(EXPECTED_ROW_ID_COL, $"id" % 10)
+        
writeDf.write.format(format).partitionBy(partitionCol).save(path.getAbsolutePath)
+        writeDf.schema
+      } else {
+        val writeDf = baseDf
+          .withColumn(EXPECTED_ROW_ID_COL, $"id")
+        writeDf.write.format(format).save(path.getAbsolutePath)
+        writeDf.schema
+      }
+      val readSchema: StructType = new StructType(writeSchema.fields ++ 
extraSchemaFields)
+      val readDf = 
spark.read.format(format).schema(readSchema).load(path.getAbsolutePath)
+      f(readDf)
+    }
+  }
+
+  private val allMetadataCols = Seq(
+    FileFormat.FILE_PATH,
+    FileFormat.FILE_SIZE,
+    FileFormat.FILE_MODIFICATION_TIME,
+    FileFormat.ROW_INDEX
+  )
+
+  /** Identifies the names of all the metadata columns present in the schema. 
*/
+  private def collectMetadataCols(struct: StructType): Seq[String] = {
+    struct.fields.flatMap { field => field.dataType match {
+      case s: StructType => collectMetadataCols(s)
+      case _ if allMetadataCols.contains(field.name) => Some(field.name)
+      case _ => None
+    }}
+  }
+
+  for (useVectorizedReader <- Seq(false, true))
+  for (useOffHeapMemory <- Seq(useVectorizedReader, false).distinct)
+  for (partitioned <- Seq(false, true)) {
+    val label = Seq(
+        { if (useVectorizedReader) "vectorized" else "parquet-mr"},
+        { if (useOffHeapMemory) "off-heap" else "" },
+        { if (partitioned) "partitioned" else "" }
+      ).filter(_.nonEmpty).mkString(", ")
+    test(s"parquet ($label) - read _metadata.row_index") {
+      withSQLConf(
+          SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
useVectorizedReader.toString,
+          SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> 
useOffHeapMemory.toString) {
+        withReadDataFrame("parquet", partitionCol = "pb") { df =>
+          val res = df.select("*", 
s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}")
+            .where(s"$EXPECTED_ROW_ID_COL != ${FileFormat.ROW_INDEX}")
+          assert(res.count() == 0)
+        }
+      }
+    }
+  }
+
+  test("supported file format - read _metadata struct") {
+    withReadDataFrame("parquet") { df =>
+      val withMetadataStruct = df.select("*", FileFormat.METADATA_NAME)
+
+      // `_metadata.row_index` column is present when selecting `_metadata` as 
a whole.
+      val metadataCols = collectMetadataCols(withMetadataStruct.schema)
+      assert(metadataCols.contains(FileFormat.ROW_INDEX))
+    }
+  }
+
+  test("unsupported file format - read _metadata struct") {
+    withReadDataFrame("orc") { df =>
+      val withMetadataStruct = df.select("*", FileFormat.METADATA_NAME)
+
+      // Metadata struct can be read without an error.
+      withMetadataStruct.collect()
+
+      // Schema does not contain row index column, but contains all the 
remaining metadata columns.
+      val metadataCols = collectMetadataCols(withMetadataStruct.schema)
+      assert(!metadataCols.contains(FileFormat.ROW_INDEX))
+      assert(allMetadataCols.intersect(metadataCols).size == 
allMetadataCols.size - 1)
+    }
+  }
+
+  test("unsupported file format - read _metadata.row_index") {
+    withReadDataFrame("orc") { df =>
+      val ex = intercept[AnalysisException] {
+        df.select("*", s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}")
+      }
+      assert(ex.getMessage.contains("No such struct field row_index"))
+    }
+  }
+
+  for (useVectorizedReader <- Seq(true, false)) {
+    val label = if (useVectorizedReader) "vectorized" else "parquet-mr"
+    test(s"parquet ($label) - use mixed case for column name") {
+      withSQLConf(
+          SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
useVectorizedReader.toString) {
+        withReadDataFrame("parquet") { df =>
+          val mixedCaseRowIndex = "RoW_InDeX"
+          assert(mixedCaseRowIndex.toLowerCase() == FileFormat.ROW_INDEX)
+
+          assert(df.select("*", 
s"${FileFormat.METADATA_NAME}.$mixedCaseRowIndex")
+            .where(s"$EXPECTED_ROW_ID_COL != $mixedCaseRowIndex")
+            .count == 0)
+        }
+      }
+    }
+  }
+
+  test(s"reading ${FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} - not present 
in a table") {
+    // File format supporting row index generation populates the column with 
row indexes.
+    withReadDataFrame("parquet", extraSchemaFields =
+        Seq(StructField(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, 
LongType))) { df =>
+      assert(df
+          .where(col(EXPECTED_ROW_ID_COL) === 
col(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME))
+          .count == NUM_ROWS)
+    }
+
+    // File format not supporting row index generation populates missing 
column with nulls.
+    withReadDataFrame("json", extraSchemaFields =
+        Seq(StructField(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, 
LongType)))  { df =>
+      assert(df
+          .where(col(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME).isNull)
+          .count == NUM_ROWS)
+    }
+  }
+
+  test(s"reading ${FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} - present in a 
table") {
+    withReadDataFrame("parquet", extraCol = 
FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME) { df =>
+      // Values of FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME column are 
always populated with
+      // generated row indexes, rather than read from the file.
+      // TODO(SPARK-40059): Allow users to include columns named
+      //                    FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME in 
their schemas.
+      assert(df
+        .where(col(EXPECTED_ROW_ID_COL) === 
col(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME))
+        .count == NUM_ROWS)
+
+      // Column cannot be read in combination with _metadata.row_index.
+      intercept[AnalysisException](df.select("*", 
FileFormat.METADATA_NAME).collect())
+      intercept[AnalysisException](df
+        .select("*", 
s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}").collect())
+    }
+  }
+
+  test(s"reading ${FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} - as partition 
col") {
+    withReadDataFrame("parquet", partitionCol = 
FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME) { df =>
+      // Column values are set for each partition, rather than populated with 
generated row indexes.
+      assert(df
+        .where(col(EXPECTED_PARTITION_COL) === 
col(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME))
+        .count == NUM_ROWS)
+
+      // Column cannot be read in combination with _metadata.row_index.
+      intercept[AnalysisException](df.select("*", 
FileFormat.METADATA_NAME).collect())
+      intercept[AnalysisException](df
+        .select("*", 
s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}").collect())
+    }
+  }
+
+  test(s"cannot make ${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} a 
partition column") {
+    withTempPath { srcPath =>
+      spark.range(0, 10, 1, 
1).toDF("id").write.parquet(srcPath.getAbsolutePath)
+
+      withTempPath { dstPath =>
+        intercept[AnalysisException] {
+          spark.read.parquet(srcPath.getAbsolutePath)
+            .select("*", FileFormat.METADATA_NAME)
+            .write
+            
.partitionBy(s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}")
+            .save(dstPath.getAbsolutePath)
+        }
+      }
+    }
+  }
+
+  test(s"read user created ${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} 
column") {
+    withReadDataFrame("parquet", partitionCol = "pb") { df =>
+      withTempPath { dir =>
+        // The `df` has 10 input files with 10 rows each. Therefore the 
`_metadata.row_index` values
+        // will be { 10 x 0, 10 x 1, ..., 10 x 9 }. We store all these values 
in a single file.
+        df.select("id", s"${FileFormat.METADATA_NAME}")
+          .coalesce(1)
+          .write.parquet(dir.getAbsolutePath)
+
+        assert(spark
+          .read.parquet(dir.getAbsolutePath)
+          .count == NUM_ROWS)
+
+        // The _metadata.row_index is returning data from the file, not 
generated metadata.
+        assert(spark
+          .read.parquet(dir.getAbsolutePath)
+          .select(s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}")
+          .distinct.count == NUM_ROWS / 10)
+      }
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index 6afea42ee83..2c56adbab94 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -59,6 +59,30 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
 
   private val METADATA_FILE_MODIFICATION_TIME = 
"_metadata.file_modification_time"
 
+  private val METADATA_ROW_INDEX = "_metadata.row_index"
+
+  private val FILE_FORMAT = "fileFormat"
+
+  private def getMetadataRow(f: Map[String, Any]): Row = f(FILE_FORMAT) match {
+    case "parquet" =>
+      Row(f(METADATA_FILE_PATH), f(METADATA_FILE_NAME), f(METADATA_FILE_SIZE),
+        f(METADATA_FILE_MODIFICATION_TIME), f(METADATA_ROW_INDEX))
+    case _ =>
+      Row(f(METADATA_FILE_PATH), f(METADATA_FILE_NAME), f(METADATA_FILE_SIZE),
+        f(METADATA_FILE_MODIFICATION_TIME))
+  }
+
+  private def getMetadataForFile(f: File): Map[String, Any] = {
+    Map(
+      METADATA_FILE_PATH -> f.toURI.toString,
+      METADATA_FILE_NAME -> f.getName,
+      METADATA_FILE_SIZE -> f.length(),
+      METADATA_FILE_MODIFICATION_TIME -> new Timestamp(f.lastModified()),
+      METADATA_ROW_INDEX -> 0,
+      FILE_FORMAT -> f.getName.split("\\.").last
+    )
+  }
+
   /**
    * This test wrapper will test for both row-based and column-based file 
formats:
    * (json and parquet) with nested schema:
@@ -101,21 +125,7 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
           val realF1 = new File(dir, "data/f1").listFiles()
             .filter(_.getName.endsWith(s".$testFileFormat")).head
 
-          // 3. create f0 and f1 metadata data
-          val f0Metadata = Map(
-            METADATA_FILE_PATH -> realF0.toURI.toString,
-            METADATA_FILE_NAME -> realF0.getName,
-            METADATA_FILE_SIZE -> realF0.length(),
-            METADATA_FILE_MODIFICATION_TIME -> new 
Timestamp(realF0.lastModified())
-          )
-          val f1Metadata = Map(
-            METADATA_FILE_PATH -> realF1.toURI.toString,
-            METADATA_FILE_NAME -> realF1.getName,
-            METADATA_FILE_SIZE -> realF1.length(),
-            METADATA_FILE_MODIFICATION_TIME -> new 
Timestamp(realF1.lastModified())
-          )
-
-          f(df, f0Metadata, f1Metadata)
+          f(df, getMetadataForFile(realF0), getMetadataForFile(realF1))
         }
       }
     }
@@ -232,10 +242,8 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
     checkAnswer(
       df.select("_metadata"),
       Seq(
-        Row(Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME),
-          f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))),
-        Row(Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME),
-          f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
+        Row(getMetadataRow(f0)),
+        Row(getMetadataRow(f1))
       )
     )
   }
@@ -348,11 +356,9 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
             df.select("name", "age", "_METADATA", "_metadata"),
             Seq(
               Row("jack", 24, Row(12345L, "uom"),
-                Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME),
-                  f0(METADATA_FILE_SIZE), 
f0(METADATA_FILE_MODIFICATION_TIME))),
+                getMetadataRow(f0)),
               Row("lily", 31, Row(54321L, "ucb"),
-                Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME),
-                  f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
+                getMetadataRow(f1))
             )
           )
         } else {
@@ -492,12 +498,8 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
       checkAnswer(
         newDF.select("*"),
         Seq(
-          Row("jack", 24, Row(12345L, "uom"),
-            Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME),
-              f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))),
-          Row("lily", 31, Row(54321L, "ucb"),
-            Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME),
-              f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
+          Row("jack", 24, Row(12345L, "uom"), getMetadataRow(f0)),
+          Row("lily", 31, Row(54321L, "ucb"), getMetadataRow(f1))
         )
       )
 
@@ -505,10 +507,8 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
       checkAnswer(
         newDF.select("_metadata"),
         Seq(
-          Row(Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME),
-            f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))),
-          Row(Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME),
-            f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
+          Row(getMetadataRow(f0)),
+          Row(getMetadataRow(f1))
         )
       )
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
new file mode 100644
index 00000000000..c36ab49b5e3
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.column.ParquetProperties._
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
+import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import 
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
+import org.apache.spark.sql.functions.{col, max, min}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{LongType, StringType}
+
+class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
+  import testImplicits._
+
+  private def readRowGroupRowCounts(path: String): Seq[Long] = {
+    ParquetFileReader.readFooter(spark.sessionState.newHadoopConf(), new 
Path(path))
+      .getBlocks.asScala.toSeq.map(_.getRowCount)
+  }
+
+  private def readRowGroupRowCounts(dir: File): Seq[Seq[Long]] = {
+    assert(dir.isDirectory)
+    dir.listFiles()
+      .filter { f => f.isFile && f.getName.endsWith("parquet") }
+      .map { f => readRowGroupRowCounts(f.getAbsolutePath) }
+  }
+
+  /**
+   * Do the files contain exactly one row group?
+   */
+  private def assertOneRowGroup(dir: File): Unit = {
+    readRowGroupRowCounts(dir).foreach { rcs =>
+      assert(rcs.length == 1, "expected one row group per file")
+    }
+  }
+
+  /**
+   * Do the files have a good layout to test row group skipping (both range 
metadata filter, and
+   * by using min/max).
+   */
+  private def assertTinyRowGroups(dir: File): Unit = {
+    readRowGroupRowCounts(dir).foreach { rcs =>
+      assert(rcs.length > 1, "expected multiple row groups per file")
+      assert(rcs.last <= DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK)
+      assert(rcs.reverse.tail.distinct == 
Seq(DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK),
+        "expected row groups with minimal row count")
+    }
+  }
+
+  /**
+   * Do the files have a good layout to test a combination of page skipping 
and row group skipping?
+   */
+  private def assertIntermediateRowGroups(dir: File): Unit = {
+    readRowGroupRowCounts(dir).foreach { rcs =>
+      assert(rcs.length >= 3, "expected at least 3 row groups per file")
+      rcs.reverse.tail.foreach { rc =>
+        assert(rc > DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
+          "expected row groups larger than minimal row count")
+      }
+    }
+  }
+
+  case class RowIndexTestConf(
+      numRows: Long = 10000L,
+      useMultipleFiles: Boolean = false,
+      useVectorizedReader: Boolean = true,
+      useSmallPages: Boolean = false,
+      useSmallRowGroups: Boolean = false,
+      useSmallSplits: Boolean = false,
+      useFilter: Boolean = false,
+      useDataSourceV2: Boolean = false) {
+
+    val NUM_MULTIPLE_FILES = 4
+    // The test doesn't work correctly if the number of records per file is 
uneven.
+    assert(!useMultipleFiles || (numRows % NUM_MULTIPLE_FILES == 0))
+
+    def numFiles: Int = if (useMultipleFiles) { NUM_MULTIPLE_FILES } else { 1 }
+
+    def rowGroupSize: Long = if (useSmallRowGroups) {
+      if (useSmallPages) {
+        // Each file will contain multiple row groups. All of them (except for 
the last one)
+        // will contain more than DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, so 
that individual
+        // pages within the row group can be skipped.
+        2048L
+      } else {
+        // Each file will contain multiple row groups. All of them (except for 
the last one)
+        // will contain exactly DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK records.
+        64L
+      }
+    } else {
+      // Each file will contain a single row group.
+      DEFAULT_BLOCK_SIZE
+    }
+
+    def pageSize: Long = if (useSmallPages) {
+      // Each page (except for the last one for each column) will contain 
exactly
+      // DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK records.
+      64L
+    } else {
+      DEFAULT_PAGE_SIZE
+    }
+
+    def writeFormat: String = "parquet"
+    def readFormat: String = if (useDataSourceV2) {
+      classOf[ParquetDataSourceV2].getCanonicalName
+    } else {
+      "parquet"
+    }
+
+    assert(useSmallRowGroups || !useSmallSplits)
+    def filesMaxPartitionBytes: Long = if (useSmallSplits) {
+      256L
+    } else {
+      SQLConf.FILES_MAX_PARTITION_BYTES.defaultValue.get
+    }
+
+    def desc: String = {
+      { if (useVectorizedReader) Seq("vectorized reader") else Seq("parquet-mr 
reader") } ++
+      { if (useMultipleFiles) Seq("many files") else Seq.empty[String] } ++
+      { if (useFilter) Seq("filtered") else Seq.empty[String] } ++
+      { if (useSmallPages) Seq("small pages") else Seq.empty[String] } ++
+      { if (useSmallRowGroups) Seq("small row groups") else Seq.empty[String] 
} ++
+      { if (useSmallSplits) Seq("small splits") else Seq.empty[String] } ++
+      { if (useDataSourceV2) Seq("datasource v2") else Seq.empty[String] }
+    }.mkString(", ")
+
+    def sqlConfs: Seq[(String, String)] = Seq(
+      SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
useVectorizedReader.toString,
+      SQLConf.FILES_MAX_PARTITION_BYTES.key -> filesMaxPartitionBytes.toString
+    ) ++ { if (useDataSourceV2) Seq(SQLConf.USE_V1_SOURCE_LIST.key -> "") else 
Seq.empty }
+  }
+
+  for (useVectorizedReader <- Seq(true, false))
+  for (useDataSourceV2 <- Seq(true, false))
+  for (useSmallRowGroups <- Seq(true, false))
+  for (useSmallPages <- Seq(true, false))
+  for (useFilter <- Seq(true, false))
+  for (useSmallSplits <- Seq(useSmallRowGroups, false).distinct) {
+    val conf = RowIndexTestConf(useVectorizedReader = useVectorizedReader,
+      useDataSourceV2 = useDataSourceV2, useSmallRowGroups = useSmallRowGroups,
+      useSmallPages = useSmallPages, useFilter = useFilter,
+      useSmallSplits = useSmallSplits)
+    testRowIndexGeneration("row index generation", conf)
+  }
+
+  private def testRowIndexGeneration(label: String, conf: RowIndexTestConf): 
Unit = {
+    test (s"$label - ${conf.desc}") {
+      withSQLConf(conf.sqlConfs: _*) {
+        withTempPath { path =>
+          val rowIndexColName = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+          val numRecordsPerFile = conf.numRows / conf.numFiles
+          val (skipCentileFirst, skipCentileMidLeft, skipCentileMidRight, 
skipCentileLast) =
+            (0.2, 0.4, 0.6, 0.8)
+          val expectedRowIdxCol = "expected_rowIdx_col"
+          val df = spark.range(0, conf.numRows, 1, conf.numFiles).toDF("id")
+            .withColumn("dummy_col", ($"id" / 55).cast("int"))
+            .withColumn(expectedRowIdxCol, ($"id" % 
numRecordsPerFile).cast("int"))
+
+          // With row index in schema.
+          val schemaWithRowIdx = df.schema.add(rowIndexColName, LongType, 
nullable = true)
+
+          df.write
+            .format(conf.writeFormat)
+            .option(ParquetOutputFormat.BLOCK_SIZE, conf.rowGroupSize)
+            .option(ParquetOutputFormat.PAGE_SIZE, conf.pageSize)
+            .option(ParquetOutputFormat.DICTIONARY_PAGE_SIZE, conf.pageSize)
+            .save(path.getAbsolutePath)
+          val dfRead = spark.read
+            .format(conf.readFormat)
+            .schema(schemaWithRowIdx)
+            .load(path.getAbsolutePath)
+
+          // Verify that the produced files are laid out as expected.
+          if (conf.useSmallRowGroups) {
+            if (conf.useSmallPages) {
+              assertIntermediateRowGroups(path)
+            } else {
+              assertTinyRowGroups(path)
+            }
+          } else {
+            assertOneRowGroup(path)
+          }
+
+          val dfToAssert = if (conf.useFilter) {
+            // Add a filter such that we skip 60% of the records:
+            // [0%, 20%], [40%, 60%], [80%, 100%]
+            dfRead.filter((
+              $"id" >= (skipCentileFirst * conf.numRows).toInt &&
+                $"id" < (skipCentileMidLeft * conf.numRows).toInt) || (
+              $"id" >= (skipCentileMidRight * conf.numRows).toInt &&
+                $"id" < (skipCentileLast * conf.numRows).toInt))
+          } else {
+            dfRead
+          }
+
+          var numPartitions: Long = 0
+          var numOutputRows: Long = 0
+          dfToAssert.collect()
+          dfToAssert.queryExecution.executedPlan.foreach {
+            case b: BatchScanExec =>
+              numPartitions += b.inputRDD.partitions.length
+              numOutputRows += b.metrics("numOutputRows").value
+            case f: FileSourceScanExec =>
+              numPartitions += f.inputRDD.partitions.length
+              numOutputRows += f.metrics("numOutputRows").value
+            case _ =>
+          }
+          assert(numPartitions > 0)
+          assert(numOutputRows > 0)
+
+          if (conf.useSmallSplits) {
+            // SPARK-39634: Until the fix the fix for PARQUET-2161 is 
available is available,
+            // it is not possible to split Parquet files into multiple 
partitions while generating
+            // row indexes.
+            // assert(numPartitions >= 2 * conf.numFiles)
+          }
+
+          // Assert that every rowIdx value matches the value in 
`expectedRowIdx`.
+          assert(dfToAssert.filter(s"$rowIndexColName != $expectedRowIdxCol")
+            .count() == 0)
+
+          if (conf.useFilter) {
+            if (conf.useSmallRowGroups) {
+              assert(numOutputRows < conf.numRows)
+            }
+
+            val minMaxRowIndexes = dfToAssert.select(
+              max(col(rowIndexColName)),
+              min(col(rowIndexColName))).collect()
+            val (expectedMaxRowIdx, expectedMinRowIdx) = if (conf.numFiles == 
1) {
+              // When there is a single file, we still have row group skipping,
+              // but that should not affect the produced rowIdx.
+              (conf.numRows * skipCentileLast - 1, conf.numRows * 
skipCentileFirst)
+            } else {
+              // For simplicity, the chosen filter skips the whole files.
+              // Thus all unskipped files will have the same max and min 
rowIdx values.
+              (numRecordsPerFile - 1, 0)
+            }
+            assert(minMaxRowIndexes(0).get(0) == expectedMaxRowIdx)
+            assert(minMaxRowIndexes(0).get(1) == expectedMinRowIdx)
+            if (!conf.useMultipleFiles) {
+              val skippedValues = List.range(0, (skipCentileFirst * 
conf.numRows).toInt) ++
+                List.range((skipCentileMidLeft * conf.numRows).toInt,
+                  (skipCentileMidRight * conf.numRows).toInt) ++
+                List.range((skipCentileLast * conf.numRows).toInt, 
conf.numRows)
+              // rowIdx column should not have any of the `skippedValues`.
+              assert(dfToAssert
+                .filter(col(rowIndexColName).isin(skippedValues: _*)).count() 
== 0)
+            }
+          } else {
+            assert(numOutputRows == conf.numRows)
+            // When there is no filter, the rowIdx values should be in range
+            // [0-`numRecordsPerFile`].
+            val expectedRowIdxValues = List.range(0, numRecordsPerFile)
+            
assert(dfToAssert.filter(col(rowIndexColName).isin(expectedRowIdxValues: _*))
+              .count() == conf.numRows)
+          }
+        }
+      }
+    }
+  }
+
+  for (useDataSourceV2 <- Seq(true, false)) {
+    val conf = RowIndexTestConf(useDataSourceV2 = useDataSourceV2)
+
+    test(s"invalid row index column type - ${conf.desc}") {
+      withSQLConf(conf.sqlConfs: _*) {
+        withTempPath{ path =>
+          val df = spark.range(0, 10, 1, 1).toDF("id")
+          val schemaWithRowIdx = df.schema
+            .add(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, StringType)
+
+          df.write
+            .format(conf.writeFormat)
+            .save(path.getAbsolutePath)
+
+          val dfRead = spark.read
+            .format(conf.readFormat)
+            .schema(schemaWithRowIdx)
+            .load(path.getAbsolutePath)
+
+          val exception = intercept[Exception](dfRead.collect())
+          
assert(exception.getMessage.contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME))
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to