This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 95aebcbf100 [SPARK-37980][SQL] Extend METADATA column to support row
indexes for Parquet files
95aebcbf100 is described below
commit 95aebcbf100de1dbedd32626ce67bd01014c973e
Author: Ala Luszczak <[email protected]>
AuthorDate: Tue Aug 16 21:03:42 2022 +0800
[SPARK-37980][SQL] Extend METADATA column to support row indexes for
Parquet files
### What changes were proposed in this pull request?
This change adds `row_index` column to `_metadata` struct. This column
allows us to uniquely identify rows read from a given file with an index
number. The n-th row in a given file with be assigned `n-1` row index in every
scan of the file, irrespective of file splitting and data skipping in use.
The new column requires file format specific support. This change
introduces Parquet support, and other formats can follow later.
### Why are the changes needed?
Row Indexes can be used in a variety of ways. A (fileName, rowIndex) tuple
uniquely identifies a row in a table. This information can be used to mark rows
e.g. can be used to create an indexer.
### Does this PR introduce _any_ user-facing change?
Yes. With this change the customers will be able to access
`_metadata.row_index` metadata column when reading Parquet data. The schema of
`_matadata` column remains unchanged for file formats without row index support.
### How was this patch tested?
* Added `FileMetadataStructSuite.scala` to make sure the feature works
correctly in different scenarios (supported/unsupported file format,
batch/record reads, on/off heap memory...).
* Added `ParquetRowIndexSuite.scala` to make sure the row indexes are
generated correctly for Parquet file in conjunction with any combination of
data skipping features.
* Extended `FileMetadataStructRowIndexSuite` to account for new column in
`_metadata` struct.
Closes #37228 from ala/row-idx-v4.
Lead-authored-by: Ala Luszczak <[email protected]>
Co-authored-by: IonutBoicu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
...ParquetNestedSchemaPruningBenchmark-results.txt | 60 ++--
.../datasources/parquet/ParquetColumnVector.java | 5 +
.../parquet/VectorizedParquetRecordReader.java | 14 +
.../spark/sql/execution/DataSourceScanExec.scala | 20 +-
.../sql/execution/datasources/DataSource.scala | 2 +-
.../sql/execution/datasources/FileFormat.scala | 46 ++-
.../sql/execution/datasources/FileScanRDD.scala | 9 +-
.../execution/datasources/FileSourceStrategy.scala | 62 +++-
.../execution/datasources/LogicalRelation.scala | 4 +-
.../sql/execution/datasources/RowIndexUtil.scala | 40 +++
.../datasources/parquet/ParquetFileFormat.scala | 6 +-
.../datasources/parquet/ParquetRowIndexUtil.scala | 120 ++++++++
.../v2/parquet/ParquetPartitionReaderFactory.scala | 6 +-
.../datasources/v2/parquet/ParquetScan.scala | 7 +-
.../execution/streaming/StreamingRelation.scala | 3 +-
.../benchmark/MetadataStructBenchmark.scala | 85 ++++++
.../FileMetadataStructRowIndexSuite.scala | 235 ++++++++++++++++
.../datasources/FileMetadataStructSuite.scala | 66 ++---
.../datasources/parquet/ParquetRowIndexSuite.scala | 313 +++++++++++++++++++++
19 files changed, 1000 insertions(+), 103 deletions(-)
diff --git
a/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
b/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
index ec149dce313..ca035973065 100644
--- a/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
+++ b/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
@@ -2,52 +2,52 @@
Nested Schema Pruning Benchmark For Parquet
================================================================================================
-OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic
+Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
Selection: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 104 114
14 9.6 104.0 1.0X
-Nested column 111 121
20 9.0 110.5 0.9X
-Nested column in array 321 328
8 3.1 320.5 0.3X
+Top-level column 91 131
30 10.9 91.4 1.0X
+Nested column 73 109
32 13.6 73.5 1.2X
+Nested column in array 248 266
28 4.0 248.1 0.4X
-OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic
+Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
Limiting: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 373 384
8 2.7 373.4 1.0X
-Nested column 386 400
10 2.6 385.8 1.0X
-Nested column in array 831 859
22 1.2 831.2 0.4X
+Top-level column 268 386
154 3.7 268.4 1.0X
+Nested column 271 305
22 3.7 271.3 1.0X
+Nested column in array 648 737
49 1.5 648.0 0.4X
-OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic
+Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
Repartitioning: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 353 366
9 2.8 353.1 1.0X
-Nested column 366 373
6 2.7 366.3 1.0X
-Nested column in array 772 793
14 1.3 771.8 0.5X
+Top-level column 227 259
24 4.4 226.6 1.0X
+Nested column 271 429
182 3.7 271.2 0.8X
+Nested column in array 644 751
114 1.6 644.1 0.4X
-OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic
+Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
Repartitioning by exprs: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 333 339
5 3.0 333.4 1.0X
-Nested column 370 388
12 2.7 369.7 0.9X
-Nested column in array 784 830
21 1.3 784.5 0.4X
+Top-level column 263 306
29 3.8 262.8 1.0X
+Nested column 323 385
48 3.1 322.5 0.8X
+Nested column in array 637 789
195 1.6 637.1 0.4X
-OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic
+Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
Sample: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 123 149
30 8.1 123.1 1.0X
-Nested column 133 157
37 7.5 132.8 0.9X
-Nested column in array 382 420
16 2.6 381.6 0.3X
+Top-level column 84 129
43 12.0 83.5 1.0X
+Nested column 83 99
13 12.1 82.7 1.0X
+Nested column in array 312 366
47 3.2 312.2 0.3X
-OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_221-b11 on Linux 5.4.0-122-generic
+Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
Sorting: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 567 579
10 1.8 566.6 1.0X
-Nested column 637 652
15 1.6 637.1 0.9X
-Nested column in array 1171 1212
28 0.9 1171.4 0.5X
+Top-level column 342 420
54 2.9 342.2 1.0X
+Nested column 423 441
17 2.4 422.6 0.8X
+Nested column in array 821 870
28 1.2 821.1 0.4X
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
index 47774e0a397..5272151acf2 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
@@ -75,6 +75,11 @@ final class ParquetColumnVector {
this.isPrimitive = column.isPrimitive();
if (missingColumns.contains(column)) {
+ if (ParquetRowIndexUtil.isRowIndexColumn(column)) {
+ // The values of row index column are going to be generated by the
reader instead.
+ return;
+ }
+
if (defaultValue == null) {
vector.setAllNull();
return;
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 63fdec4056f..97f739c5bf2 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -132,6 +132,11 @@ public class VectorizedParquetRecordReader extends
SpecificParquetRecordReaderBa
*/
private boolean returnColumnarBatch;
+ /**
+ * Populates the row index column if needed.
+ */
+ private ParquetRowIndexUtil.RowIndexGenerator rowIndexGenerator = null;
+
/**
* The memory mode of the columnarBatch
*/
@@ -275,6 +280,8 @@ public class VectorizedParquetRecordReader extends
SpecificParquetRecordReaderBa
(ConstantColumnVector) vectors[i + partitionIdx], partitionValues,
i);
}
}
+
+ rowIndexGenerator =
ParquetRowIndexUtil.createGeneratorIfNeeded(sparkSchema);
}
private void initBatch() {
@@ -324,6 +331,10 @@ public class VectorizedParquetRecordReader extends
SpecificParquetRecordReaderBa
}
cv.assemble();
}
+ // If needed, compute row indexes within a file.
+ if (rowIndexGenerator != null) {
+ rowIndexGenerator.populateRowIndex(columnVectors, num);
+ }
rowsReturned += num;
columnarBatch.setNumRows(num);
@@ -395,6 +406,9 @@ public class VectorizedParquetRecordReader extends
SpecificParquetRecordReaderBa
throw new IOException("expecting more rows but reached last block. Read "
+ rowsReturned + " out of " + totalRowCount);
}
+ if (rowIndexGenerator != null) {
+ rowIndexGenerator.initFromPageReadStore(pages);
+ }
for (ParquetColumnVector cv : columnVectors) {
initColumnReader(pages, cv);
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 5950136e79a..fdb49bd7674 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat
=> ParquetSource}
import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
-import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
+import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector,
OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
@@ -214,8 +214,17 @@ trait FileSourceScanLike extends DataSourceScanExec {
requiredSchema = requiredSchema,
partitionSchema = relation.partitionSchema,
relation.sparkSession.sessionState.conf).map { vectorTypes =>
- // for column-based file format, append metadata column's vector type
classes if any
- vectorTypes ++
Seq.fill(metadataColumns.size)(classOf[ConstantColumnVector].getName)
+ vectorTypes ++
+ // for column-based file format, append metadata column's vector
type classes if any
+ metadataColumns.map { metadataCol =>
+ if (FileFormat.isConstantMetadataAttr(metadataCol.name)) {
+ classOf[ConstantColumnVector].getName
+ } else if
(relation.sparkSession.sessionState.conf.offHeapColumnVectorEnabled) {
+ classOf[OffHeapColumnVector].getName
+ } else {
+ classOf[OnHeapColumnVector].getName
+ }
+ }
}
lazy val driverMetrics = Map(
@@ -690,7 +699,10 @@ case class FileSourceScanExec(
if (shouldProcess(filePath)) {
val isSplitable = relation.fileFormat.isSplitable(
- relation.sparkSession, relation.options, filePath)
+ relation.sparkSession, relation.options, filePath) &&
+ // SPARK-39634: Allow file splitting in combination with row index
generation once
+ // the fix for PARQUET-2161 is available.
+ !RowIndexUtil.isNeededForSchema(requiredSchema)
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 8f8846b89f3..d50fd88f65c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -111,7 +111,7 @@ case class DataSource(
}
}
- private def providingInstance() =
providingClass.getConstructor().newInstance()
+ private[sql] def providingInstance(): Any =
providingClass.getConstructor().newInstance()
private def newHadoopConfiguration(): Configuration =
sparkSession.sessionState.newHadoopConfWithOptions(options)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index f9b37fb5d9f..f7f917d8947 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{DataType, LongType, StringType,
StructField, StructType, TimestampType}
@@ -182,18 +183,32 @@ object FileFormat {
val FILE_MODIFICATION_TIME = "file_modification_time"
+ val ROW_INDEX = "row_index"
+
+ // A name for a temporary column that holds row indexes computed by the file
format reader
+ // until they can be placed in the _metadata struct.
+ val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
+
val METADATA_NAME = "_metadata"
- // supported metadata struct fields for hadoop fs relation
- val METADATA_STRUCT: StructType = new StructType()
- .add(StructField(FILE_PATH, StringType))
- .add(StructField(FILE_NAME, StringType))
- .add(StructField(FILE_SIZE, LongType))
- .add(StructField(FILE_MODIFICATION_TIME, TimestampType))
+ /** Schema of metadata struct that can be produced by every file format. */
+ val BASE_METADATA_STRUCT: StructType = new StructType()
+ .add(StructField(FileFormat.FILE_PATH, StringType))
+ .add(StructField(FileFormat.FILE_NAME, StringType))
+ .add(StructField(FileFormat.FILE_SIZE, LongType))
+ .add(StructField(FileFormat.FILE_MODIFICATION_TIME, TimestampType))
- // create a file metadata struct col
- def createFileMetadataCol: AttributeReference =
- FileSourceMetadataAttribute(METADATA_NAME, METADATA_STRUCT)
+ /**
+ * Create a file metadata struct column containing fields supported by the
given file format.
+ */
+ def createFileMetadataCol(fileFormat: FileFormat): AttributeReference = {
+ val struct = if (fileFormat.isInstanceOf[ParquetFileFormat]) {
+ BASE_METADATA_STRUCT.add(StructField(FileFormat.ROW_INDEX, LongType))
+ } else {
+ BASE_METADATA_STRUCT
+ }
+ FileSourceMetadataAttribute(FileFormat.METADATA_NAME, struct)
+ }
// create an internal row given required metadata fields and file information
def createMetadataInternalRow(
@@ -220,10 +235,23 @@ object FileFormat {
// the modificationTime from the file is in millisecond,
// while internally, the TimestampType `file_modification_time` is
stored in microsecond
row.update(i, fileModificationTime * 1000L)
+ case ROW_INDEX =>
+ // Do nothing. Only the metadata fields that have identical values
for each row of the
+ // file are set by this function, while fields that have different
values (such as row
+ // index) are set separately.
}
}
row
}
+
+ /**
+ * Returns true if the given metadata column always contains identical
values for all rows
+ * originating from the same data file.
+ */
+ def isConstantMetadataAttr(name: String): Boolean = name match {
+ case FILE_PATH | FILE_NAME | FILE_SIZE | FILE_MODIFICATION_TIME => true
+ case ROW_INDEX => false
+ }
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 4c3f5629e78..827d41dd096 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.FileFormat._
import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
import org.apache.spark.sql.types.{LongType, StringType, StructType}
-import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.NextIterator
@@ -133,8 +133,9 @@ class FileScanRDD(
}
/**
- * For each partitioned file, metadata columns for each record in the
file are exactly same.
- * Only update metadata row when `currentFile` is changed.
+ * The value of some of the metadata columns remains exactly the same
for each record of
+ * a partitioned file. Only need to update their values in the metadata
row when `currentFile`
+ * is changed.
*/
private def updateMetadataRow(): Unit =
if (metadataColumns.nonEmpty && currentFile != null) {
@@ -145,7 +146,7 @@ class FileScanRDD(
/**
* Create an array of constant column vectors containing all required
metadata columns
*/
- private def createMetadataColumnVector(c: ColumnarBatch):
Array[ConstantColumnVector] = {
+ private def createMetadataColumnVector(c: ColumnarBatch):
Array[ColumnVector] = {
val path = new Path(currentFile.filePath)
metadataColumns.map(_.name).map {
case FILE_PATH =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 4995a0d6cd4..22ad7960cdd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources
+import java.util.Locale
+
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -26,7 +28,7 @@ import
org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME
-import org.apache.spark.sql.types.{DoubleType, FloatType, StructType}
+import org.apache.spark.sql.types.{DoubleType, FloatType, LongType, StructType}
import org.apache.spark.util.collection.BitSet
/**
@@ -206,13 +208,6 @@ object FileSourceStrategy extends Strategy with
PredicateHelper with Logging {
val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq
++ projects
val requiredAttributes = AttributeSet(requiredExpressions)
- val readDataColumns =
- dataColumns
- .filter(requiredAttributes.contains)
- .filterNot(partitionColumns.contains)
- val outputSchema = readDataColumns.toStructType
- logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
-
val metadataStructOpt = l.output.collectFirst {
case FileSourceMetadataAttribute(attr) => attr
}
@@ -223,14 +218,45 @@ object FileSourceStrategy extends Strategy with
PredicateHelper with Logging {
}.toSeq
}.getOrElse(Seq.empty)
- // outputAttributes should also include the metadata columns at the very
end
- val outputAttributes = readDataColumns ++ partitionColumns ++
metadataColumns
+ val fileConstantMetadataColumns: Seq[Attribute] =
+ metadataColumns.filter(_.name != FileFormat.ROW_INDEX)
+
+ val readDataColumns = dataColumns
+ .filter(requiredAttributes.contains)
+ .filterNot(partitionColumns.contains)
+
+ val fileFormatReaderGeneratedMetadataColumns: Seq[Attribute] =
+ metadataColumns.map(_.name).flatMap {
+ case FileFormat.ROW_INDEX =>
+ if ((readDataColumns ++
partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
+ .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
+ throw new
AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
+ " is a reserved column name that cannot be read in combination
with " +
+ s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} column.")
+ }
+
Some(AttributeReference(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType)())
+ case _ => None
+ }
+
+ val outputDataSchema = (readDataColumns ++
fileFormatReaderGeneratedMetadataColumns)
+ .toStructType
+
+ // The output rows will be produced during file scan operation in three
steps:
+ // (1) File format reader populates a `Row` with `readDataColumns` and
+ // `fileFormatReaderGeneratedMetadataColumns`
+ // (2) Then, a row containing `partitionColumns` is joined at the end.
+ // (3) Finally, a row containing `fileConstantMetadataColumns` is also
joined at the end.
+ // By placing `fileFormatReaderGeneratedMetadataColumns` before
`partitionColumns` and
+ // `fileConstantMetadataColumns` in the `outputAttributes` we make these
row operations
+ // simpler and more efficient.
+ val outputAttributes = readDataColumns ++
fileFormatReaderGeneratedMetadataColumns ++
+ partitionColumns ++ fileConstantMetadataColumns
val scan =
FileSourceScanExec(
fsRelation,
outputAttributes,
- outputSchema,
+ outputDataSchema,
partitionKeyFilters.toSeq,
bucketSet,
None,
@@ -239,10 +265,20 @@ object FileSourceStrategy extends Strategy with
PredicateHelper with Logging {
// extra Project node: wrap flat metadata columns to a metadata struct
val withMetadataProjections = metadataStructOpt.map { metadataStruct =>
+ val structColumns = metadataColumns.map { col => col.name match {
+ case FileFormat.FILE_PATH | FileFormat.FILE_NAME |
FileFormat.FILE_SIZE |
+ FileFormat.FILE_MODIFICATION_TIME =>
+ col
+ case FileFormat.ROW_INDEX =>
+ fileFormatReaderGeneratedMetadataColumns
+ .find(_.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
+ .get.withName(FileFormat.ROW_INDEX)
+ }
+ }
val metadataAlias =
- Alias(CreateStruct(metadataColumns), METADATA_NAME)(exprId =
metadataStruct.exprId)
+ Alias(CreateStruct(structColumns), METADATA_NAME)(exprId =
metadataStruct.exprId)
execution.ProjectExec(
- scan.output.dropRight(metadataColumns.length) :+ metadataAlias, scan)
+ readDataColumns ++ partitionColumns :+ metadataAlias, scan)
}.getOrElse(scan)
val afterScanFilter =
afterScanFilters.toSeq.reduceOption(expressions.And)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 291b98fb37c..43699c1b6b1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -69,7 +69,7 @@ case class LogicalRelation(
}
override lazy val metadataOutput: Seq[AttributeReference] = relation match {
- case _: HadoopFsRelation =>
+ case relation: HadoopFsRelation =>
val resolve = conf.resolver
val outputNames = outputSet.map(_.name)
def isOutputColumn(col: AttributeReference): Boolean = {
@@ -78,7 +78,7 @@ case class LogicalRelation(
// filter out the metadata struct column if it has the name conflicting
with output columns.
// if the file has a column "_metadata",
// then the data column should be returned not the metadata struct column
- Seq(FileFormat.createFileMetadataCol).filterNot(isOutputColumn)
+
Seq(FileFormat.createFileMetadataCol(relation.fileFormat)).filterNot(isOutputColumn)
case _ => Nil
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RowIndexUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RowIndexUtil.scala
new file mode 100644
index 00000000000..1512b6da1e8
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RowIndexUtil.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+
+object RowIndexUtil {
+ def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = {
+ sparkSchema.fields.zipWithIndex.find { case (field: StructField, _: Int) =>
+ field.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+ } match {
+ case Some((field: StructField, idx: Int)) =>
+ if (field.dataType != LongType) {
+ throw new
RuntimeException(s"${FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} must be of " +
+ "LongType")
+ }
+ idx
+ case _ => -1
+ }
+ }
+
+ def isNeededForSchema(sparkSchema: StructType): Boolean = {
+ findRowIndexColumnIndexInSchema(sparkSchema) >= 0
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 513379d23d6..c20063333c5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -366,9 +366,11 @@ class ParquetFileFormat
} else {
new ParquetRecordReader[InternalRow](readSupport)
}
- val iter = new RecordReaderIterator[InternalRow](reader)
+ val readerWithRowIndexes =
ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader,
+ requiredSchema)
+ val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes)
try {
- reader.initialize(split, hadoopAttemptContext)
+ readerWithRowIndexes.initialize(split, hadoopAttemptContext)
val fullSchema = requiredSchema.toAttributes ++
partitionSchema.toAttributes
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema,
fullSchema)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala
new file mode 100644
index 00000000000..fb1f6a417f4
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexUtil.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader,
TaskAttemptContext}
+import org.apache.parquet.column.page.PageReadStore
+import org.apache.parquet.hadoop.ParquetRecordReader
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, RowIndexUtil}
+import
org.apache.spark.sql.execution.datasources.RowIndexUtil.findRowIndexColumnIndexInSchema
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector
+import org.apache.spark.sql.types.StructType
+
+
+object ParquetRowIndexUtil {
+ /**
+ * Generate row indexes for vectorized readers.
+ */
+ class RowIndexGenerator(rowIndexColumnIdx: Int) {
+ var rowIndexIterator: Iterator[Long] = _
+
+ /** For Parquet only: initialize the generator using provided
PageReadStore. */
+ def initFromPageReadStore(pages: PageReadStore): Unit = {
+ if (!pages.getRowIndexOffset.isPresent) {
+ throw new IOException("PageReadStore returned no row index offset.")
+ }
+ val startingRowIdx: Long = pages.getRowIndexOffset.get()
+ if (pages.getRowIndexes.isPresent) {
+ // The presence of `getRowIndexes` indicates that page skipping is
effective and only
+ // a subset of rows in the row group is going to be read. Note that
there is a name
+ // collision here: these row indexes (unlike ones this class is
generating) are counted
+ // starting from 0 in each of the row groups.
+ rowIndexIterator = pages.getRowIndexes.get.asScala.map(idx => idx +
startingRowIdx)
+ } else {
+ val numRowsInRowGroup = pages.getRowCount
+ rowIndexIterator = (startingRowIdx until startingRowIdx +
numRowsInRowGroup).iterator
+ }
+ }
+
+ def populateRowIndex(columnVectors: Array[ParquetColumnVector], numRows:
Int): Unit = {
+ populateRowIndex(columnVectors(rowIndexColumnIdx).getValueVector,
numRows)
+ }
+
+ def populateRowIndex(columnVector: WritableColumnVector, numRows: Int):
Unit = {
+ for (i <- 0 until numRows) {
+ columnVector.putLong(i, rowIndexIterator.next())
+ }
+ }
+ }
+
+ def createGeneratorIfNeeded(sparkSchema: StructType): RowIndexGenerator = {
+ val columnIdx = findRowIndexColumnIndexInSchema(sparkSchema)
+ if (columnIdx >= 0) new RowIndexGenerator(columnIdx)
+ else null
+ }
+
+ /**
+ * A wrapper for `ParquetRecordReader` that sets row index column to the
correct value in
+ * the returned InternalRow. Used in combination with non-vectorized
(parquet-mr) Parquet reader.
+ */
+ private class RecordReaderWithRowIndexes(
+ parent: ParquetRecordReader[InternalRow],
+ rowIndexColumnIdx: Int)
+ extends RecordReader[Void, InternalRow] {
+
+ override def initialize(
+ inputSplit: InputSplit,
+ taskAttemptContext: TaskAttemptContext): Unit = {
+ parent.initialize(inputSplit, taskAttemptContext)
+ }
+
+ override def nextKeyValue(): Boolean = parent.nextKeyValue()
+
+ override def getCurrentKey: Void = parent.getCurrentKey
+
+ override def getCurrentValue: InternalRow = {
+ val row = parent.getCurrentValue
+ row.setLong(rowIndexColumnIdx, parent.getCurrentRowIndex)
+ row
+ }
+
+ override def getProgress: Float = parent.getProgress
+
+ override def close(): Unit = parent.close()
+ }
+
+ def addRowIndexToRecordReaderIfNeeded(
+ reader: ParquetRecordReader[InternalRow],
+ sparkSchema: StructType): RecordReader[Void, InternalRow] = {
+ val rowIndexColumnIdx =
RowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema)
+ if (rowIndexColumnIdx >= 0) {
+ new RecordReaderWithRowIndexes(reader, rowIndexColumnIdx)
+ } else {
+ reader
+ }
+ }
+
+ def isRowIndexColumn(column: ParquetColumn): Boolean = {
+ column.path.length == 1 && column.path.last ==
FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 0f6e5201df8..121ebe1cfa2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -295,10 +295,12 @@ case class ParquetPartitionReaderFactory(
} else {
new ParquetRecordReader[InternalRow](readSupport)
}
- val iter = new RecordReaderIterator(reader)
+ val readerWithRowIndexes =
ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(
+ reader, readDataSchema)
+ val iter = new RecordReaderIterator(readerWithRowIndexes)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
- reader
+ readerWithRowIndexes
}
private def createVectorizedReader(file: PartitionedFile):
VectorizedParquetRecordReader = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
index 0457e8be715..ff0b38880fd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
import org.apache.spark.sql.connector.read.PartitionReaderFactory
-import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils,
PartitioningAwareFileIndex}
+import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils,
PartitioningAwareFileIndex, RowIndexUtil}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions,
ParquetReadSupport, ParquetWriteSupport}
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.internal.SQLConf
@@ -50,7 +50,10 @@ case class ParquetScan(
override def isSplitable(path: Path): Boolean = {
// If aggregate is pushed down, only the file footer will be read once,
// so file should not be split across multiple tasks.
- pushedAggregate.isEmpty
+ pushedAggregate.isEmpty &&
+ // SPARK-39634: Allow file splitting in combination with row index
generation once
+ // the fix for PARQUET-2161 is available.
+ !RowIndexUtil.isNeededForSchema(readSchema)
}
override def readSchema(): StructType = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 7b177f3b67d..af90b692a70 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -70,7 +70,8 @@ case class StreamingRelation(dataSource: DataSource,
sourceName: String, output:
// filter out the metadata struct column if it has the name
conflicting with output columns.
// if the file has a column "_metadata",
// then the data column should be returned not the metadata struct
column
- Seq(FileFormat.createFileMetadataCol).filterNot(isOutputColumn)
+ Seq(FileFormat.createFileMetadataCol(
+
dataSource.providingInstance().asInstanceOf[FileFormat])).filterNot(isOutputColumn)
case _ => Nil
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MetadataStructBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MetadataStructBenchmark.scala
new file mode 100644
index 00000000000..38fff24abe5
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MetadataStructBenchmark.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+
+
+object MetadataStructBenchmark extends SqlBasedBenchmark {
+ import spark.implicits._
+
+ private val NUM_ROWS = 5000000
+ private val NUM_ITERS = 32
+
+ private def withTempData(format: String = "parquet")(f: DataFrame => Unit):
Unit = {
+ val dir = Utils.createTempDir()
+ dir.delete()
+ try {
+ spark.range(0, NUM_ROWS, 1, 1).toDF("id")
+ .withColumn("num1", $"id" + 10)
+ .withColumn("num2", $"id" / 10)
+ .withColumn("str", lit("a sample string ") + $"id".cast("string"))
+ .write.format(format).save(dir.getAbsolutePath)
+ val df = spark.read.format(format).load(dir.getAbsolutePath)
+ f(df)
+ } finally {
+ Utils.deleteRecursively(dir)
+ }
+ }
+
+ private def addCase(benchmark: Benchmark, df: DataFrame, metadataCol:
Option[String]): Unit = {
+ benchmark.addCase(metadataCol.getOrElse("no metadata columns")) { _ =>
+ df.select("*", metadataCol.toSeq: _*).noop()
+ }
+ }
+
+ private def metadataBenchmark(name: String, format: String): Unit = {
+ withTempData(format) { df =>
+ val metadataCols = df.select(FileFormat.METADATA_NAME).schema
+ .fields.head.dataType.asInstanceOf[StructType].fieldNames
+
+ val benchmark = new Benchmark(name, NUM_ROWS, NUM_ITERS, output = output)
+
+ addCase(benchmark, df, None)
+ for (metadataCol <- metadataCols) {
+ addCase(benchmark, df,
Some(s"${FileFormat.METADATA_NAME}.$metadataCol"))
+ }
+ addCase(benchmark, df, Some(FileFormat.METADATA_NAME))
+
+ benchmark.run()
+ }
+ }
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ runBenchmark("Metadata Struct Benchmark") {
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+ metadataBenchmark("Vectorized Parquet", "parquet")
+ }
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+ metadataBenchmark("Parquet-mr", "parquet")
+ }
+ metadataBenchmark("JSON", "json")
+ }
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructRowIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructRowIndexSuite.scala
new file mode 100644
index 00000000000..af2d56159bf
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructRowIndexSuite.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
+import org.apache.spark.sql.functions.{col, lit}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+class FileMetadataStructRowIndexSuite extends QueryTest with
SharedSparkSession {
+ import testImplicits._
+
+ val EXPECTED_ROW_ID_COL = "expected_row_idx"
+ val EXPECTED_EXTRA_COL = "expected_extra_col"
+ val EXPECTED_PARTITION_COL = "experted_pb_col"
+ val NUM_ROWS = 100
+
+ def withReadDataFrame(
+ format: String,
+ partitionCol: String = null,
+ extraCol: String = "ec",
+ extraSchemaFields: Seq[StructField] = Seq.empty)
+ (f: DataFrame => Unit): Unit = {
+ withTempPath { path =>
+ val baseDf = spark.range(0, NUM_ROWS, 1, 1).toDF("id")
+ .withColumn(extraCol, $"id" + lit(1000 * 1000))
+ .withColumn(EXPECTED_EXTRA_COL, col(extraCol))
+ val writeSchema: StructType = if (partitionCol != null) {
+ val writeDf = baseDf
+ .withColumn(partitionCol, ($"id" / 10).cast("int") + lit(1000))
+ .withColumn(EXPECTED_PARTITION_COL, col(partitionCol))
+ .withColumn(EXPECTED_ROW_ID_COL, $"id" % 10)
+
writeDf.write.format(format).partitionBy(partitionCol).save(path.getAbsolutePath)
+ writeDf.schema
+ } else {
+ val writeDf = baseDf
+ .withColumn(EXPECTED_ROW_ID_COL, $"id")
+ writeDf.write.format(format).save(path.getAbsolutePath)
+ writeDf.schema
+ }
+ val readSchema: StructType = new StructType(writeSchema.fields ++
extraSchemaFields)
+ val readDf =
spark.read.format(format).schema(readSchema).load(path.getAbsolutePath)
+ f(readDf)
+ }
+ }
+
+ private val allMetadataCols = Seq(
+ FileFormat.FILE_PATH,
+ FileFormat.FILE_SIZE,
+ FileFormat.FILE_MODIFICATION_TIME,
+ FileFormat.ROW_INDEX
+ )
+
+ /** Identifies the names of all the metadata columns present in the schema.
*/
+ private def collectMetadataCols(struct: StructType): Seq[String] = {
+ struct.fields.flatMap { field => field.dataType match {
+ case s: StructType => collectMetadataCols(s)
+ case _ if allMetadataCols.contains(field.name) => Some(field.name)
+ case _ => None
+ }}
+ }
+
+ for (useVectorizedReader <- Seq(false, true))
+ for (useOffHeapMemory <- Seq(useVectorizedReader, false).distinct)
+ for (partitioned <- Seq(false, true)) {
+ val label = Seq(
+ { if (useVectorizedReader) "vectorized" else "parquet-mr"},
+ { if (useOffHeapMemory) "off-heap" else "" },
+ { if (partitioned) "partitioned" else "" }
+ ).filter(_.nonEmpty).mkString(", ")
+ test(s"parquet ($label) - read _metadata.row_index") {
+ withSQLConf(
+ SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key ->
useVectorizedReader.toString,
+ SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key ->
useOffHeapMemory.toString) {
+ withReadDataFrame("parquet", partitionCol = "pb") { df =>
+ val res = df.select("*",
s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}")
+ .where(s"$EXPECTED_ROW_ID_COL != ${FileFormat.ROW_INDEX}")
+ assert(res.count() == 0)
+ }
+ }
+ }
+ }
+
+ test("supported file format - read _metadata struct") {
+ withReadDataFrame("parquet") { df =>
+ val withMetadataStruct = df.select("*", FileFormat.METADATA_NAME)
+
+ // `_metadata.row_index` column is present when selecting `_metadata` as
a whole.
+ val metadataCols = collectMetadataCols(withMetadataStruct.schema)
+ assert(metadataCols.contains(FileFormat.ROW_INDEX))
+ }
+ }
+
+ test("unsupported file format - read _metadata struct") {
+ withReadDataFrame("orc") { df =>
+ val withMetadataStruct = df.select("*", FileFormat.METADATA_NAME)
+
+ // Metadata struct can be read without an error.
+ withMetadataStruct.collect()
+
+ // Schema does not contain row index column, but contains all the
remaining metadata columns.
+ val metadataCols = collectMetadataCols(withMetadataStruct.schema)
+ assert(!metadataCols.contains(FileFormat.ROW_INDEX))
+ assert(allMetadataCols.intersect(metadataCols).size ==
allMetadataCols.size - 1)
+ }
+ }
+
+ test("unsupported file format - read _metadata.row_index") {
+ withReadDataFrame("orc") { df =>
+ val ex = intercept[AnalysisException] {
+ df.select("*", s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}")
+ }
+ assert(ex.getMessage.contains("No such struct field row_index"))
+ }
+ }
+
+ for (useVectorizedReader <- Seq(true, false)) {
+ val label = if (useVectorizedReader) "vectorized" else "parquet-mr"
+ test(s"parquet ($label) - use mixed case for column name") {
+ withSQLConf(
+ SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key ->
useVectorizedReader.toString) {
+ withReadDataFrame("parquet") { df =>
+ val mixedCaseRowIndex = "RoW_InDeX"
+ assert(mixedCaseRowIndex.toLowerCase() == FileFormat.ROW_INDEX)
+
+ assert(df.select("*",
s"${FileFormat.METADATA_NAME}.$mixedCaseRowIndex")
+ .where(s"$EXPECTED_ROW_ID_COL != $mixedCaseRowIndex")
+ .count == 0)
+ }
+ }
+ }
+ }
+
+ test(s"reading ${FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} - not present
in a table") {
+ // File format supporting row index generation populates the column with
row indexes.
+ withReadDataFrame("parquet", extraSchemaFields =
+ Seq(StructField(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME,
LongType))) { df =>
+ assert(df
+ .where(col(EXPECTED_ROW_ID_COL) ===
col(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME))
+ .count == NUM_ROWS)
+ }
+
+ // File format not supporting row index generation populates missing
column with nulls.
+ withReadDataFrame("json", extraSchemaFields =
+ Seq(StructField(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME,
LongType))) { df =>
+ assert(df
+ .where(col(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME).isNull)
+ .count == NUM_ROWS)
+ }
+ }
+
+ test(s"reading ${FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} - present in a
table") {
+ withReadDataFrame("parquet", extraCol =
FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME) { df =>
+ // Values of FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME column are
always populated with
+ // generated row indexes, rather than read from the file.
+ // TODO(SPARK-40059): Allow users to include columns named
+ // FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME in
their schemas.
+ assert(df
+ .where(col(EXPECTED_ROW_ID_COL) ===
col(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME))
+ .count == NUM_ROWS)
+
+ // Column cannot be read in combination with _metadata.row_index.
+ intercept[AnalysisException](df.select("*",
FileFormat.METADATA_NAME).collect())
+ intercept[AnalysisException](df
+ .select("*",
s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}").collect())
+ }
+ }
+
+ test(s"reading ${FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} - as partition
col") {
+ withReadDataFrame("parquet", partitionCol =
FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME) { df =>
+ // Column values are set for each partition, rather than populated with
generated row indexes.
+ assert(df
+ .where(col(EXPECTED_PARTITION_COL) ===
col(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME))
+ .count == NUM_ROWS)
+
+ // Column cannot be read in combination with _metadata.row_index.
+ intercept[AnalysisException](df.select("*",
FileFormat.METADATA_NAME).collect())
+ intercept[AnalysisException](df
+ .select("*",
s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}").collect())
+ }
+ }
+
+ test(s"cannot make ${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} a
partition column") {
+ withTempPath { srcPath =>
+ spark.range(0, 10, 1,
1).toDF("id").write.parquet(srcPath.getAbsolutePath)
+
+ withTempPath { dstPath =>
+ intercept[AnalysisException] {
+ spark.read.parquet(srcPath.getAbsolutePath)
+ .select("*", FileFormat.METADATA_NAME)
+ .write
+
.partitionBy(s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}")
+ .save(dstPath.getAbsolutePath)
+ }
+ }
+ }
+ }
+
+ test(s"read user created ${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}
column") {
+ withReadDataFrame("parquet", partitionCol = "pb") { df =>
+ withTempPath { dir =>
+ // The `df` has 10 input files with 10 rows each. Therefore the
`_metadata.row_index` values
+ // will be { 10 x 0, 10 x 1, ..., 10 x 9 }. We store all these values
in a single file.
+ df.select("id", s"${FileFormat.METADATA_NAME}")
+ .coalesce(1)
+ .write.parquet(dir.getAbsolutePath)
+
+ assert(spark
+ .read.parquet(dir.getAbsolutePath)
+ .count == NUM_ROWS)
+
+ // The _metadata.row_index is returning data from the file, not
generated metadata.
+ assert(spark
+ .read.parquet(dir.getAbsolutePath)
+ .select(s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX}")
+ .distinct.count == NUM_ROWS / 10)
+ }
+ }
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index 6afea42ee83..2c56adbab94 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -59,6 +59,30 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
private val METADATA_FILE_MODIFICATION_TIME =
"_metadata.file_modification_time"
+ private val METADATA_ROW_INDEX = "_metadata.row_index"
+
+ private val FILE_FORMAT = "fileFormat"
+
+ private def getMetadataRow(f: Map[String, Any]): Row = f(FILE_FORMAT) match {
+ case "parquet" =>
+ Row(f(METADATA_FILE_PATH), f(METADATA_FILE_NAME), f(METADATA_FILE_SIZE),
+ f(METADATA_FILE_MODIFICATION_TIME), f(METADATA_ROW_INDEX))
+ case _ =>
+ Row(f(METADATA_FILE_PATH), f(METADATA_FILE_NAME), f(METADATA_FILE_SIZE),
+ f(METADATA_FILE_MODIFICATION_TIME))
+ }
+
+ private def getMetadataForFile(f: File): Map[String, Any] = {
+ Map(
+ METADATA_FILE_PATH -> f.toURI.toString,
+ METADATA_FILE_NAME -> f.getName,
+ METADATA_FILE_SIZE -> f.length(),
+ METADATA_FILE_MODIFICATION_TIME -> new Timestamp(f.lastModified()),
+ METADATA_ROW_INDEX -> 0,
+ FILE_FORMAT -> f.getName.split("\\.").last
+ )
+ }
+
/**
* This test wrapper will test for both row-based and column-based file
formats:
* (json and parquet) with nested schema:
@@ -101,21 +125,7 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
val realF1 = new File(dir, "data/f1").listFiles()
.filter(_.getName.endsWith(s".$testFileFormat")).head
- // 3. create f0 and f1 metadata data
- val f0Metadata = Map(
- METADATA_FILE_PATH -> realF0.toURI.toString,
- METADATA_FILE_NAME -> realF0.getName,
- METADATA_FILE_SIZE -> realF0.length(),
- METADATA_FILE_MODIFICATION_TIME -> new
Timestamp(realF0.lastModified())
- )
- val f1Metadata = Map(
- METADATA_FILE_PATH -> realF1.toURI.toString,
- METADATA_FILE_NAME -> realF1.getName,
- METADATA_FILE_SIZE -> realF1.length(),
- METADATA_FILE_MODIFICATION_TIME -> new
Timestamp(realF1.lastModified())
- )
-
- f(df, f0Metadata, f1Metadata)
+ f(df, getMetadataForFile(realF0), getMetadataForFile(realF1))
}
}
}
@@ -232,10 +242,8 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
checkAnswer(
df.select("_metadata"),
Seq(
- Row(Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME),
- f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))),
- Row(Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME),
- f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
+ Row(getMetadataRow(f0)),
+ Row(getMetadataRow(f1))
)
)
}
@@ -348,11 +356,9 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
df.select("name", "age", "_METADATA", "_metadata"),
Seq(
Row("jack", 24, Row(12345L, "uom"),
- Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME),
- f0(METADATA_FILE_SIZE),
f0(METADATA_FILE_MODIFICATION_TIME))),
+ getMetadataRow(f0)),
Row("lily", 31, Row(54321L, "ucb"),
- Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME),
- f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
+ getMetadataRow(f1))
)
)
} else {
@@ -492,12 +498,8 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
checkAnswer(
newDF.select("*"),
Seq(
- Row("jack", 24, Row(12345L, "uom"),
- Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME),
- f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))),
- Row("lily", 31, Row(54321L, "ucb"),
- Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME),
- f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
+ Row("jack", 24, Row(12345L, "uom"), getMetadataRow(f0)),
+ Row("lily", 31, Row(54321L, "ucb"), getMetadataRow(f1))
)
)
@@ -505,10 +507,8 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
checkAnswer(
newDF.select("_metadata"),
Seq(
- Row(Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME),
- f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))),
- Row(Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME),
- f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
+ Row(getMetadataRow(f0)),
+ Row(getMetadataRow(f1))
)
)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
new file mode 100644
index 00000000000..c36ab49b5e3
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.column.ParquetProperties._
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
+import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
+import org.apache.spark.sql.functions.{col, max, min}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{LongType, StringType}
+
+class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
+ import testImplicits._
+
+ private def readRowGroupRowCounts(path: String): Seq[Long] = {
+ ParquetFileReader.readFooter(spark.sessionState.newHadoopConf(), new
Path(path))
+ .getBlocks.asScala.toSeq.map(_.getRowCount)
+ }
+
+ private def readRowGroupRowCounts(dir: File): Seq[Seq[Long]] = {
+ assert(dir.isDirectory)
+ dir.listFiles()
+ .filter { f => f.isFile && f.getName.endsWith("parquet") }
+ .map { f => readRowGroupRowCounts(f.getAbsolutePath) }
+ }
+
+ /**
+ * Do the files contain exactly one row group?
+ */
+ private def assertOneRowGroup(dir: File): Unit = {
+ readRowGroupRowCounts(dir).foreach { rcs =>
+ assert(rcs.length == 1, "expected one row group per file")
+ }
+ }
+
+ /**
+ * Do the files have a good layout to test row group skipping (both range
metadata filter, and
+ * by using min/max).
+ */
+ private def assertTinyRowGroups(dir: File): Unit = {
+ readRowGroupRowCounts(dir).foreach { rcs =>
+ assert(rcs.length > 1, "expected multiple row groups per file")
+ assert(rcs.last <= DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK)
+ assert(rcs.reverse.tail.distinct ==
Seq(DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK),
+ "expected row groups with minimal row count")
+ }
+ }
+
+ /**
+ * Do the files have a good layout to test a combination of page skipping
and row group skipping?
+ */
+ private def assertIntermediateRowGroups(dir: File): Unit = {
+ readRowGroupRowCounts(dir).foreach { rcs =>
+ assert(rcs.length >= 3, "expected at least 3 row groups per file")
+ rcs.reverse.tail.foreach { rc =>
+ assert(rc > DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
+ "expected row groups larger than minimal row count")
+ }
+ }
+ }
+
+ case class RowIndexTestConf(
+ numRows: Long = 10000L,
+ useMultipleFiles: Boolean = false,
+ useVectorizedReader: Boolean = true,
+ useSmallPages: Boolean = false,
+ useSmallRowGroups: Boolean = false,
+ useSmallSplits: Boolean = false,
+ useFilter: Boolean = false,
+ useDataSourceV2: Boolean = false) {
+
+ val NUM_MULTIPLE_FILES = 4
+ // The test doesn't work correctly if the number of records per file is
uneven.
+ assert(!useMultipleFiles || (numRows % NUM_MULTIPLE_FILES == 0))
+
+ def numFiles: Int = if (useMultipleFiles) { NUM_MULTIPLE_FILES } else { 1 }
+
+ def rowGroupSize: Long = if (useSmallRowGroups) {
+ if (useSmallPages) {
+ // Each file will contain multiple row groups. All of them (except for
the last one)
+ // will contain more than DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, so
that individual
+ // pages within the row group can be skipped.
+ 2048L
+ } else {
+ // Each file will contain multiple row groups. All of them (except for
the last one)
+ // will contain exactly DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK records.
+ 64L
+ }
+ } else {
+ // Each file will contain a single row group.
+ DEFAULT_BLOCK_SIZE
+ }
+
+ def pageSize: Long = if (useSmallPages) {
+ // Each page (except for the last one for each column) will contain
exactly
+ // DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK records.
+ 64L
+ } else {
+ DEFAULT_PAGE_SIZE
+ }
+
+ def writeFormat: String = "parquet"
+ def readFormat: String = if (useDataSourceV2) {
+ classOf[ParquetDataSourceV2].getCanonicalName
+ } else {
+ "parquet"
+ }
+
+ assert(useSmallRowGroups || !useSmallSplits)
+ def filesMaxPartitionBytes: Long = if (useSmallSplits) {
+ 256L
+ } else {
+ SQLConf.FILES_MAX_PARTITION_BYTES.defaultValue.get
+ }
+
+ def desc: String = {
+ { if (useVectorizedReader) Seq("vectorized reader") else Seq("parquet-mr
reader") } ++
+ { if (useMultipleFiles) Seq("many files") else Seq.empty[String] } ++
+ { if (useFilter) Seq("filtered") else Seq.empty[String] } ++
+ { if (useSmallPages) Seq("small pages") else Seq.empty[String] } ++
+ { if (useSmallRowGroups) Seq("small row groups") else Seq.empty[String]
} ++
+ { if (useSmallSplits) Seq("small splits") else Seq.empty[String] } ++
+ { if (useDataSourceV2) Seq("datasource v2") else Seq.empty[String] }
+ }.mkString(", ")
+
+ def sqlConfs: Seq[(String, String)] = Seq(
+ SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key ->
useVectorizedReader.toString,
+ SQLConf.FILES_MAX_PARTITION_BYTES.key -> filesMaxPartitionBytes.toString
+ ) ++ { if (useDataSourceV2) Seq(SQLConf.USE_V1_SOURCE_LIST.key -> "") else
Seq.empty }
+ }
+
+ for (useVectorizedReader <- Seq(true, false))
+ for (useDataSourceV2 <- Seq(true, false))
+ for (useSmallRowGroups <- Seq(true, false))
+ for (useSmallPages <- Seq(true, false))
+ for (useFilter <- Seq(true, false))
+ for (useSmallSplits <- Seq(useSmallRowGroups, false).distinct) {
+ val conf = RowIndexTestConf(useVectorizedReader = useVectorizedReader,
+ useDataSourceV2 = useDataSourceV2, useSmallRowGroups = useSmallRowGroups,
+ useSmallPages = useSmallPages, useFilter = useFilter,
+ useSmallSplits = useSmallSplits)
+ testRowIndexGeneration("row index generation", conf)
+ }
+
+ private def testRowIndexGeneration(label: String, conf: RowIndexTestConf):
Unit = {
+ test (s"$label - ${conf.desc}") {
+ withSQLConf(conf.sqlConfs: _*) {
+ withTempPath { path =>
+ val rowIndexColName = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+ val numRecordsPerFile = conf.numRows / conf.numFiles
+ val (skipCentileFirst, skipCentileMidLeft, skipCentileMidRight,
skipCentileLast) =
+ (0.2, 0.4, 0.6, 0.8)
+ val expectedRowIdxCol = "expected_rowIdx_col"
+ val df = spark.range(0, conf.numRows, 1, conf.numFiles).toDF("id")
+ .withColumn("dummy_col", ($"id" / 55).cast("int"))
+ .withColumn(expectedRowIdxCol, ($"id" %
numRecordsPerFile).cast("int"))
+
+ // With row index in schema.
+ val schemaWithRowIdx = df.schema.add(rowIndexColName, LongType,
nullable = true)
+
+ df.write
+ .format(conf.writeFormat)
+ .option(ParquetOutputFormat.BLOCK_SIZE, conf.rowGroupSize)
+ .option(ParquetOutputFormat.PAGE_SIZE, conf.pageSize)
+ .option(ParquetOutputFormat.DICTIONARY_PAGE_SIZE, conf.pageSize)
+ .save(path.getAbsolutePath)
+ val dfRead = spark.read
+ .format(conf.readFormat)
+ .schema(schemaWithRowIdx)
+ .load(path.getAbsolutePath)
+
+ // Verify that the produced files are laid out as expected.
+ if (conf.useSmallRowGroups) {
+ if (conf.useSmallPages) {
+ assertIntermediateRowGroups(path)
+ } else {
+ assertTinyRowGroups(path)
+ }
+ } else {
+ assertOneRowGroup(path)
+ }
+
+ val dfToAssert = if (conf.useFilter) {
+ // Add a filter such that we skip 60% of the records:
+ // [0%, 20%], [40%, 60%], [80%, 100%]
+ dfRead.filter((
+ $"id" >= (skipCentileFirst * conf.numRows).toInt &&
+ $"id" < (skipCentileMidLeft * conf.numRows).toInt) || (
+ $"id" >= (skipCentileMidRight * conf.numRows).toInt &&
+ $"id" < (skipCentileLast * conf.numRows).toInt))
+ } else {
+ dfRead
+ }
+
+ var numPartitions: Long = 0
+ var numOutputRows: Long = 0
+ dfToAssert.collect()
+ dfToAssert.queryExecution.executedPlan.foreach {
+ case b: BatchScanExec =>
+ numPartitions += b.inputRDD.partitions.length
+ numOutputRows += b.metrics("numOutputRows").value
+ case f: FileSourceScanExec =>
+ numPartitions += f.inputRDD.partitions.length
+ numOutputRows += f.metrics("numOutputRows").value
+ case _ =>
+ }
+ assert(numPartitions > 0)
+ assert(numOutputRows > 0)
+
+ if (conf.useSmallSplits) {
+ // SPARK-39634: Until the fix the fix for PARQUET-2161 is
available is available,
+ // it is not possible to split Parquet files into multiple
partitions while generating
+ // row indexes.
+ // assert(numPartitions >= 2 * conf.numFiles)
+ }
+
+ // Assert that every rowIdx value matches the value in
`expectedRowIdx`.
+ assert(dfToAssert.filter(s"$rowIndexColName != $expectedRowIdxCol")
+ .count() == 0)
+
+ if (conf.useFilter) {
+ if (conf.useSmallRowGroups) {
+ assert(numOutputRows < conf.numRows)
+ }
+
+ val minMaxRowIndexes = dfToAssert.select(
+ max(col(rowIndexColName)),
+ min(col(rowIndexColName))).collect()
+ val (expectedMaxRowIdx, expectedMinRowIdx) = if (conf.numFiles ==
1) {
+ // When there is a single file, we still have row group skipping,
+ // but that should not affect the produced rowIdx.
+ (conf.numRows * skipCentileLast - 1, conf.numRows *
skipCentileFirst)
+ } else {
+ // For simplicity, the chosen filter skips the whole files.
+ // Thus all unskipped files will have the same max and min
rowIdx values.
+ (numRecordsPerFile - 1, 0)
+ }
+ assert(minMaxRowIndexes(0).get(0) == expectedMaxRowIdx)
+ assert(minMaxRowIndexes(0).get(1) == expectedMinRowIdx)
+ if (!conf.useMultipleFiles) {
+ val skippedValues = List.range(0, (skipCentileFirst *
conf.numRows).toInt) ++
+ List.range((skipCentileMidLeft * conf.numRows).toInt,
+ (skipCentileMidRight * conf.numRows).toInt) ++
+ List.range((skipCentileLast * conf.numRows).toInt,
conf.numRows)
+ // rowIdx column should not have any of the `skippedValues`.
+ assert(dfToAssert
+ .filter(col(rowIndexColName).isin(skippedValues: _*)).count()
== 0)
+ }
+ } else {
+ assert(numOutputRows == conf.numRows)
+ // When there is no filter, the rowIdx values should be in range
+ // [0-`numRecordsPerFile`].
+ val expectedRowIdxValues = List.range(0, numRecordsPerFile)
+
assert(dfToAssert.filter(col(rowIndexColName).isin(expectedRowIdxValues: _*))
+ .count() == conf.numRows)
+ }
+ }
+ }
+ }
+ }
+
+ for (useDataSourceV2 <- Seq(true, false)) {
+ val conf = RowIndexTestConf(useDataSourceV2 = useDataSourceV2)
+
+ test(s"invalid row index column type - ${conf.desc}") {
+ withSQLConf(conf.sqlConfs: _*) {
+ withTempPath{ path =>
+ val df = spark.range(0, 10, 1, 1).toDF("id")
+ val schemaWithRowIdx = df.schema
+ .add(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, StringType)
+
+ df.write
+ .format(conf.writeFormat)
+ .save(path.getAbsolutePath)
+
+ val dfRead = spark.read
+ .format(conf.readFormat)
+ .schema(schemaWithRowIdx)
+ .load(path.getAbsolutePath)
+
+ val exception = intercept[Exception](dfRead.collect())
+
assert(exception.getMessage.contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME))
+ }
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]