This is an automated email from the ASF dual-hosted git repository.
kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a53ecc4a1 [VL] Enable GlutenParqutRowIndexSuite for Spark 3.4/3.5
(#5740)
a53ecc4a1 is described below
commit a53ecc4a1261afe52c659a4333af20c687552f0c
Author: 高阳阳 <[email protected]>
AuthorDate: Wed May 15 14:35:20 2024 +0800
[VL] Enable GlutenParqutRowIndexSuite for Spark 3.4/3.5 (#5740)
---
.../execution/BatchScanExecTransformer.scala | 5 +
.../gluten/utils/velox/VeloxTestSettings.scala | 8 +-
.../parquet/GlutenParquetRowIndexSuite.scala | 335 +++++++++++++++++++-
.../gluten/utils/velox/VeloxTestSettings.scala | 5 +-
.../parquet/GlutenParquetRowIndexSuite.scala | 342 ++++++++++++++++++++-
.../datasources/v2/BatchScanExecShim.scala | 7 +
.../datasources/v2/BatchScanExecShim.scala | 14 +
.../sql/execution/FileSourceScanExecShim.scala | 5 +-
.../datasources/v2/BatchScanExecShim.scala | 20 ++
.../sql/execution/FileSourceScanExecShim.scala | 5 +-
.../datasources/v2/BatchScanExecShim.scala | 20 ++
11 files changed, 756 insertions(+), 10 deletions(-)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 3aeeffae1..b0c8c59e7 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -136,6 +136,11 @@ abstract class BatchScanExecTransformerBase(
if (pushedAggregate.nonEmpty) {
return ValidationResult.notOk(s"Unsupported aggregation push down for
$scan.")
}
+
+ if (hasUnsupportedColumns) {
+ return ValidationResult.notOk(s"Unsupported columns scan in native.")
+ }
+
super.doValidateInternal()
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 498ed5ef4..1afa203ab 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -30,7 +30,7 @@ import
org.apache.spark.sql.execution.datasources.csv.{GlutenCSVLegacyTimeParser
import
org.apache.spark.sql.execution.datasources.exchange.GlutenValidateRequirementsSuite
import
org.apache.spark.sql.execution.datasources.json.{GlutenJsonLegacyTimeParserSuite,
GlutenJsonV1Suite, GlutenJsonV2Suite}
import
org.apache.spark.sql.execution.datasources.orc.{GlutenOrcColumnarBatchReaderSuite,
GlutenOrcFilterSuite, GlutenOrcPartitionDiscoverySuite, GlutenOrcSourceSuite,
GlutenOrcV1FilterSuite, GlutenOrcV1PartitionDiscoverySuite,
GlutenOrcV1QuerySuite, GlutenOrcV1SchemaPruningSuite, GlutenOrcV2QuerySuite,
GlutenOrcV2SchemaPruningSuite}
-import
org.apache.spark.sql.execution.datasources.parquet.{GlutenParquetColumnIndexSuite,
GlutenParquetCompressionCodecPrecedenceSuite,
GlutenParquetDeltaByteArrayEncodingSuite, GlutenParquetDeltaEncodingInteger,
GlutenParquetDeltaEncodingLong, GlutenParquetDeltaLengthByteArrayEncodingSuite,
GlutenParquetEncodingSuite, GlutenParquetFieldIdIOSuite,
GlutenParquetFileFormatV1Suite, GlutenParquetFileFormatV2Suite,
GlutenParquetInteroperabilitySuite, GlutenParquetIOSuite, GlutenParquetProtobu
[...]
+import
org.apache.spark.sql.execution.datasources.parquet.{GlutenParquetColumnIndexSuite,
GlutenParquetCompressionCodecPrecedenceSuite,
GlutenParquetDeltaByteArrayEncodingSuite, GlutenParquetDeltaEncodingInteger,
GlutenParquetDeltaEncodingLong, GlutenParquetDeltaLengthByteArrayEncodingSuite,
GlutenParquetEncodingSuite, GlutenParquetFieldIdIOSuite,
GlutenParquetFileFormatV1Suite, GlutenParquetFileFormatV2Suite,
GlutenParquetInteroperabilitySuite, GlutenParquetIOSuite, GlutenParquetProtobu
[...]
import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite,
GlutenTextV2Suite}
import
org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite,
GlutenFileTableSuite, GlutenV2PredicateSuite}
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
@@ -1189,9 +1189,9 @@ class VeloxTestSettings extends BackendTestSettings {
// Row index metadata column support in Velox isn't ready yet, refer
velox-9147
.exclude("reading _tmp_metadata_row_index - not present in a table")
.exclude("reading _tmp_metadata_row_index - present in a table")
- // Row index metadata column support in Velox isn't ready yet, refer
velox-9147
- // enableSuite[GlutenParquetRowIndexSuite]
-
+ enableSuite[GlutenParquetRowIndexSuite]
+ .excludeByPrefix("row index generation")
+ .excludeByPrefix("invalid row index column type")
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
// scalastyle:on line.size.limit
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
index acf6a2b63..6f153450c 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
@@ -16,6 +16,339 @@
*/
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.gluten.execution.{BatchScanExecTransformer,
FileSourceScanExecTransformer}
+
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
+import org.apache.spark.sql.functions.{col, max, min}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{LongType, StringType}
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.column.ParquetProperties._
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+class GlutenParquetRowIndexSuite extends ParquetRowIndexSuite with
GlutenSQLTestsBaseTrait {
+ import testImplicits._
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ sparkContext.setLogLevel("info")
+ }
+
+ private def readRowGroupRowCounts(path: String): Seq[Long] = {
+ ParquetFooterReader
+ .readFooter(
+ spark.sessionState.newHadoopConf(),
+ new Path(path),
+ ParquetMetadataConverter.NO_FILTER)
+ .getBlocks
+ .asScala
+ .map(_.getRowCount)
+ }
+
+ private def readRowGroupRowCounts(dir: File): Seq[Seq[Long]] = {
+ assert(dir.isDirectory)
+ dir
+ .listFiles()
+ .filter(f => f.isFile && f.getName.endsWith("parquet"))
+ .map(f => readRowGroupRowCounts(f.getAbsolutePath))
+ }
+
+ /** Do the files contain exactly one row group? */
+ private def assertOneRowGroup(dir: File): Unit = {
+ readRowGroupRowCounts(dir).foreach {
+ rcs => assert(rcs.length == 1, "expected one row group per file")
+ }
+ }
+
+ /**
+ * Do the files have a good layout to test row group skipping (both range
metadata filter, and by
+ * using min/max).
+ */
+ private def assertTinyRowGroups(dir: File): Unit = {
+ readRowGroupRowCounts(dir).foreach {
+ rcs =>
+ assert(rcs.length > 1, "expected multiple row groups per file")
+ assert(rcs.last <= DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK)
+ assert(
+ rcs.reverse.tail.distinct ==
Seq(DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK),
+ "expected row groups with minimal row count")
+ }
+ }
+
+ /**
+ * Do the files have a good layout to test a combination of page skipping
and row group skipping?
+ */
+ private def assertIntermediateRowGroups(dir: File): Unit = {
+ readRowGroupRowCounts(dir).foreach {
+ rcs =>
+ assert(rcs.length >= 3, "expected at least 3 row groups per file")
+ rcs.reverse.tail.foreach {
+ rc =>
+ assert(
+ rc > DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
+ "expected row groups larger than minimal row count")
+ }
+ }
+ }
+
+ case class GlutenRowIndexTestConf(
+ numRows: Long = 10000L,
+ useMultipleFiles: Boolean = false,
+ useVectorizedReader: Boolean = true,
+ useSmallPages: Boolean = false,
+ useSmallRowGroups: Boolean = false,
+ useSmallSplits: Boolean = false,
+ useFilter: Boolean = false,
+ useDataSourceV2: Boolean = false) {
+
+ val NUM_MULTIPLE_FILES = 4
+ // The test doesn't work correctly if the number of records per file is
uneven.
+ assert(!useMultipleFiles || (numRows % NUM_MULTIPLE_FILES == 0))
+
+ def numFiles: Int = if (useMultipleFiles) { NUM_MULTIPLE_FILES }
+ else { 1 }
+
+ def rowGroupSize: Long = if (useSmallRowGroups) {
+ if (useSmallPages) {
+ // Each file will contain multiple row groups. All of them (except for
the last one)
+ // will contain more than DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, so
that individual
+ // pages within the row group can be skipped.
+ 2048L
+ } else {
+ // Each file will contain multiple row groups. All of them (except for
the last one)
+ // will contain exactly DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK records.
+ 64L
+ }
+ } else {
+ // Each file will contain a single row group.
+ DEFAULT_BLOCK_SIZE
+ }
+
+ def pageSize: Long = if (useSmallPages) {
+ // Each page (except for the last one for each column) will contain
exactly
+ // DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK records.
+ 64L
+ } else {
+ DEFAULT_PAGE_SIZE
+ }
+
+ def writeFormat: String = "parquet"
+ def readFormat: String = if (useDataSourceV2) {
+ classOf[ParquetDataSourceV2].getCanonicalName
+ } else {
+ "parquet"
+ }
+
+ assert(useSmallRowGroups || !useSmallSplits)
+ def filesMaxPartitionBytes: Long = if (useSmallSplits) {
+ 256L
+ } else {
+ SQLConf.FILES_MAX_PARTITION_BYTES.defaultValue.get
+ }
+
+ def desc: String = {
+ { if (useVectorizedReader) Seq("vectorized reader") else Seq("parquet-mr
reader") } ++ {
+ if (useMultipleFiles) Seq("many files") else Seq.empty[String]
+ } ++ { if (useFilter) Seq("filtered") else Seq.empty[String] } ++ {
+ if (useSmallPages) Seq("small pages") else Seq.empty[String]
+ } ++ { if (useSmallRowGroups) Seq("small row groups") else
Seq.empty[String] } ++ {
+ if (useSmallSplits) Seq("small splits") else Seq.empty[String]
+ } ++ { if (useDataSourceV2) Seq("datasource v2") else Seq.empty[String] }
+ }.mkString(", ")
+
+ def sqlConfs: Seq[(String, String)] = Seq(
+ // TODO: remove this change after customized parquet options as
`block_size`, `page_size`
+ // been fully supported.
+ "spark.gluten.sql.native.writer.enabled" -> "false",
+ SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key ->
useVectorizedReader.toString,
+ SQLConf.FILES_MAX_PARTITION_BYTES.key -> filesMaxPartitionBytes.toString
+ ) ++ { if (useDataSourceV2) Seq(SQLConf.USE_V1_SOURCE_LIST.key -> "") else
Seq.empty }
+ }
+
+ for (useVectorizedReader <- Seq(true, false))
+ for (useDataSourceV2 <- Seq(true, false))
+ for (useSmallRowGroups <- Seq(true, false))
+ for (useSmallPages <- Seq(true, false))
+ for (useFilter <- Seq(true, false))
+ for (useSmallSplits <- Seq(useSmallRowGroups, false).distinct) {
+ val conf = GlutenRowIndexTestConf(
+ useVectorizedReader = useVectorizedReader,
+ useDataSourceV2 = useDataSourceV2,
+ useSmallRowGroups = useSmallRowGroups,
+ useSmallPages = useSmallPages,
+ useFilter = useFilter,
+ useSmallSplits = useSmallSplits
+ )
+ testRowIndexGeneration("row index generation", conf)
+ }
+
+ private def testRowIndexGeneration(label: String, conf:
GlutenRowIndexTestConf): Unit = {
+ testGluten(s"$label - ${conf.desc}") {
+ withSQLConf(conf.sqlConfs: _*) {
+ withTempPath {
+ path =>
+ val rowIndexColName = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+ val numRecordsPerFile = conf.numRows / conf.numFiles
+ val (skipCentileFirst, skipCentileMidLeft, skipCentileMidRight,
skipCentileLast) =
+ (0.2, 0.4, 0.6, 0.8)
+ val expectedRowIdxCol = "expected_rowIdx_col"
+ val df = spark
+ .range(0, conf.numRows, 1, conf.numFiles)
+ .toDF("id")
+ .withColumn("dummy_col", ($"id" / 55).cast("int"))
+ .withColumn(expectedRowIdxCol, ($"id" %
numRecordsPerFile).cast("int"))
+
+ // With row index in schema.
+ val schemaWithRowIdx = df.schema.add(rowIndexColName, LongType,
nullable = true)
+
+ df.write
+ .format(conf.writeFormat)
+ .option(ParquetOutputFormat.BLOCK_SIZE, conf.rowGroupSize)
+ .option(ParquetOutputFormat.PAGE_SIZE, conf.pageSize)
+ .option(ParquetOutputFormat.DICTIONARY_PAGE_SIZE, conf.pageSize)
+ .save(path.getAbsolutePath)
+ val dfRead = spark.read
+ .format(conf.readFormat)
+ .schema(schemaWithRowIdx)
+ .load(path.getAbsolutePath)
+
+ // Verify that the produced files are laid out as expected.
+ if (conf.useSmallRowGroups) {
+ if (conf.useSmallPages) {
+ assertIntermediateRowGroups(path)
+ } else {
+ assertTinyRowGroups(path)
+ }
+ } else {
+ assertOneRowGroup(path)
+ }
+
+ val dfToAssert = if (conf.useFilter) {
+ // Add a filter such that we skip 60% of the records:
+ // [0%, 20%], [40%, 60%], [80%, 100%]
+ dfRead.filter(
+ ($"id" >= (skipCentileFirst * conf.numRows).toInt &&
+ $"id" < (skipCentileMidLeft * conf.numRows).toInt) ||
+ ($"id" >= (skipCentileMidRight * conf.numRows).toInt &&
+ $"id" < (skipCentileLast * conf.numRows).toInt))
+ } else {
+ dfRead
+ }
+
+ var numPartitions: Long = 0
+ var numOutputRows: Long = 0
+ dfToAssert.collect()
+ logInfo(dfToAssert.queryExecution.executedPlan.toString())
+ dfToAssert.queryExecution.executedPlan.foreach {
+ case a: BatchScanExec =>
+ numPartitions += a.inputRDD.partitions.length
+ numOutputRows += a.metrics("numOutputRows").value
+ case b: FileSourceScanExec =>
+ numPartitions += b.inputRDD.partitions.length
+ numOutputRows += b.metrics("numOutputRows").value
+ case c: BatchScanExecTransformer =>
+ numPartitions += c.inputRDD.partitions.length
+ numOutputRows += c.metrics("numOutputRows").value
+ case f: FileSourceScanExecTransformer =>
+ numPartitions += f.inputRDD.partitions.length
+ numOutputRows += f.metrics("numOutputRows").value
+ case _ =>
+ }
+ assert(numPartitions > 0)
+ assert(numOutputRows > 0)
+
+ if (conf.useSmallSplits) {
+ // SPARK-39634: Until the fix the fix for PARQUET-2161 is
available is available,
+ // it is not possible to split Parquet files into multiple
partitions while generating
+ // row indexes.
+ // assert(numPartitions >= 2 * conf.numFiles)
+ }
+
+ // Assert that every rowIdx value matches the value in
`expectedRowIdx`.
+ assert(
+ dfToAssert
+ .filter(s"$rowIndexColName != $expectedRowIdxCol")
+ .count() == 0)
+
+ if (conf.useFilter) {
+ if (conf.useSmallRowGroups) {
+ assert(numOutputRows < conf.numRows)
+ }
+
+ val minMaxRowIndexes =
+ dfToAssert.select(max(col(rowIndexColName)),
min(col(rowIndexColName))).collect()
+ val (expectedMaxRowIdx, expectedMinRowIdx) = if (conf.numFiles
== 1) {
+ // When there is a single file, we still have row group
skipping,
+ // but that should not affect the produced rowIdx.
+ (conf.numRows * skipCentileLast - 1, conf.numRows *
skipCentileFirst)
+ } else {
+ // For simplicity, the chosen filter skips the whole files.
+ // Thus all unskipped files will have the same max and min
rowIdx values.
+ (numRecordsPerFile - 1, 0)
+ }
+ assert(minMaxRowIndexes(0).get(0) == expectedMaxRowIdx)
+ assert(minMaxRowIndexes(0).get(1) == expectedMinRowIdx)
+ if (!conf.useMultipleFiles) {
+ val skippedValues = List.range(0, (skipCentileFirst *
conf.numRows).toInt) ++
+ List.range(
+ (skipCentileMidLeft * conf.numRows).toInt,
+ (skipCentileMidRight * conf.numRows).toInt) ++
+ List.range((skipCentileLast * conf.numRows).toInt,
conf.numRows)
+ // rowIdx column should not have any of the `skippedValues`.
+ assert(
+ dfToAssert
+ .filter(col(rowIndexColName).isin(skippedValues: _*))
+ .count() == 0)
+ }
+ } else {
+ assert(numOutputRows == conf.numRows)
+ // When there is no filter, the rowIdx values should be in range
+ // [0-`numRecordsPerFile`].
+ val expectedRowIdxValues = List.range(0, numRecordsPerFile)
+ assert(
+ dfToAssert
+ .filter(col(rowIndexColName).isin(expectedRowIdxValues: _*))
+ .count() == conf.numRows)
+ }
+ }
+ }
+ }
+ }
+
+ for (useDataSourceV2 <- Seq(true, false)) {
+ val conf = RowIndexTestConf(useDataSourceV2 = useDataSourceV2)
+
+ testGluten(s"invalid row index column type - ${conf.desc}") {
+ withSQLConf(conf.sqlConfs: _*) {
+ withTempPath {
+ path =>
+ val df = spark.range(0, 10, 1, 1).toDF("id")
+ val schemaWithRowIdx = df.schema
+ .add(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, StringType)
+
+ df.write
+ .format(conf.writeFormat)
+ .save(path.getAbsolutePath)
+
+ val dfRead = spark.read
+ .format(conf.readFormat)
+ .schema(schemaWithRowIdx)
+ .load(path.getAbsolutePath)
-class GlutenParquetRowIndexSuite extends ParquetRowIndexSuite with
GlutenSQLTestsBaseTrait {}
+ val exception = intercept[Exception](dfRead.collect())
+
assert(exception.getMessage.contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME))
+ }
+ }
+ }
+ }
+}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index a59819411..61353d99f 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -1198,8 +1198,9 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenResolveDefaultColumnsSuite]
enableSuite[GlutenSubqueryHintPropagationSuite]
enableSuite[GlutenUrlFunctionsSuite]
- // Row index metadata column support in Velox isn't ready yet, refer
velox-9147
- // enableSuite[GlutenParquetRowIndexSuite]
+ enableSuite[GlutenParquetRowIndexSuite]
+ .excludeByPrefix("row index generation")
+ .excludeByPrefix("invalid row index column type")
enableSuite[GlutenBitmapExpressionsQuerySuite]
enableSuite[GlutenEmptyInSuite]
enableSuite[GlutenRuntimeNullChecksV2Writes]
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
index acf6a2b63..abf21651f 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
@@ -16,6 +16,346 @@
*/
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.gluten.execution.{BatchScanExecTransformer,
FileSourceScanExecTransformer}
+
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
+import org.apache.spark.sql.functions.{col, max, min}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{LongType, StringType}
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.column.ParquetProperties._
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+class GlutenParquetRowIndexSuite extends ParquetRowIndexSuite with
GlutenSQLTestsBaseTrait {
+ import testImplicits._
+
+ private def readRowGroupRowCounts(path: String): Seq[Long] = {
+ ParquetFooterReader
+ .readFooter(
+ spark.sessionState.newHadoopConf(),
+ new Path(path),
+ ParquetMetadataConverter.NO_FILTER)
+ .getBlocks
+ .asScala
+ .map(_.getRowCount)
+ }
+
+ private def readRowGroupRowCounts(dir: File): Seq[Seq[Long]] = {
+ assert(dir.isDirectory)
+ dir
+ .listFiles()
+ .filter(f => f.isFile && f.getName.endsWith("parquet"))
+ .map(f => readRowGroupRowCounts(f.getAbsolutePath))
+ .toSeq
+ }
+
+ /** Do the files contain exactly one row group? */
+ private def assertOneRowGroup(dir: File): Unit = {
+ readRowGroupRowCounts(dir).foreach {
+ rcs => assert(rcs.length == 1, "expected one row group per file")
+ }
+ }
+
+ /**
+ * Do the files have a good layout to test row group skipping (both range
metadata filter, and by
+ * using min/max).
+ */
+ private def assertTinyRowGroups(dir: File): Unit = {
+ readRowGroupRowCounts(dir).foreach {
+ rcs =>
+ assert(rcs.length > 1, "expected multiple row groups per file")
+ assert(rcs.last <= DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK)
+ assert(
+ rcs.reverse.tail.distinct ==
Seq(DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK),
+ "expected row groups with minimal row count")
+ }
+ }
+
+ /**
+ * Do the files have a good layout to test a combination of page skipping
and row group skipping?
+ */
+ private def assertIntermediateRowGroups(dir: File): Unit = {
+ readRowGroupRowCounts(dir).foreach {
+ rcs =>
+ assert(rcs.length >= 3, "expected at least 3 row groups per file")
+ rcs.reverse.tail.foreach {
+ rc =>
+ assert(
+ rc > DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
+ "expected row groups larger than minimal row count")
+ }
+ }
+ }
+
+ case class GlutenRowIndexTestConf(
+ numRows: Long = 10000L,
+ useMultipleFiles: Boolean = false,
+ useVectorizedReader: Boolean = true,
+ useSmallPages: Boolean = false,
+ useSmallRowGroups: Boolean = false,
+ useSmallSplits: Boolean = false,
+ useFilter: Boolean = false,
+ useDataSourceV2: Boolean = false) {
+
+ val NUM_MULTIPLE_FILES = 4
+ // The test doesn't work correctly if the number of records per file is
uneven.
+ assert(!useMultipleFiles || (numRows % NUM_MULTIPLE_FILES == 0))
+
+ def numFiles: Int = if (useMultipleFiles) { NUM_MULTIPLE_FILES }
+ else { 1 }
+
+ def rowGroupSize: Long = if (useSmallRowGroups) {
+ if (useSmallPages) {
+ // Each file will contain multiple row groups. All of them (except for
the last one)
+ // will contain more than DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, so
that individual
+ // pages within the row group can be skipped.
+ 2048L
+ } else {
+ // Each file will contain multiple row groups. All of them (except for
the last one)
+ // will contain exactly DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK records.
+ 64L
+ }
+ } else {
+ // Each file will contain a single row group.
+ DEFAULT_BLOCK_SIZE
+ }
+
+ def pageSize: Long = if (useSmallPages) {
+ // Each page (except for the last one for each column) will contain
exactly
+ // DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK records.
+ 64L
+ } else {
+ DEFAULT_PAGE_SIZE
+ }
+
+ def writeFormat: String = "parquet"
+ def readFormat: String = if (useDataSourceV2) {
+ classOf[ParquetDataSourceV2].getCanonicalName
+ } else {
+ "parquet"
+ }
+
+ assert(useSmallRowGroups || !useSmallSplits)
+ def filesMaxPartitionBytes: Long = if (useSmallSplits) {
+ 256L
+ } else {
+ SQLConf.FILES_MAX_PARTITION_BYTES.defaultValue.get
+ }
+
+ def desc: String = {
+ { if (useVectorizedReader) Seq("vectorized reader") else Seq("parquet-mr
reader") } ++ {
+ if (useMultipleFiles) Seq("many files") else Seq.empty[String]
+ } ++ { if (useFilter) Seq("filtered") else Seq.empty[String] } ++ {
+ if (useSmallPages) Seq("small pages") else Seq.empty[String]
+ } ++ { if (useSmallRowGroups) Seq("small row groups") else
Seq.empty[String] } ++ {
+ if (useSmallSplits) Seq("small splits") else Seq.empty[String]
+ } ++ { if (useDataSourceV2) Seq("datasource v2") else Seq.empty[String] }
+ }.mkString(", ")
+
+ def sqlConfs: Seq[(String, String)] = Seq(
+ // TODO: remove this change after customized parquet options as
`block_size`, `page_size`
+ // been fully supported.
+ "spark.gluten.sql.native.writer.enabled" -> "false",
+ SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key ->
useVectorizedReader.toString,
+ SQLConf.FILES_MAX_PARTITION_BYTES.key -> filesMaxPartitionBytes.toString
+ ) ++ { if (useDataSourceV2) Seq(SQLConf.USE_V1_SOURCE_LIST.key -> "") else
Seq.empty }
+ }
+
+ for (useVectorizedReader <- Seq(true, false))
+ for (useDataSourceV2 <- Seq(true, false))
+ for (useSmallRowGroups <- Seq(true, false))
+ for (useSmallPages <- Seq(true, false))
+ for (useFilter <- Seq(true, false))
+ for (useSmallSplits <- Seq(useSmallRowGroups, false).distinct) {
+ val conf = GlutenRowIndexTestConf(
+ useVectorizedReader = useVectorizedReader,
+ useDataSourceV2 = useDataSourceV2,
+ useSmallRowGroups = useSmallRowGroups,
+ useSmallPages = useSmallPages,
+ useFilter = useFilter,
+ useSmallSplits = useSmallSplits
+ )
+ testRowIndexGeneration("row index generation", conf)
+ }
+
+ private def testRowIndexGeneration(label: String, conf:
GlutenRowIndexTestConf): Unit = {
+ testGluten(s"$label - ${conf.desc}") {
+ withSQLConf(conf.sqlConfs: _*) {
+ withTempPath {
+ path =>
+ // Read row index using _metadata.row_index if that is supported
by the file format.
+ val rowIndexMetadataColumnSupported = conf.readFormat match {
+ case "parquet" => true
+ case _ => false
+ }
+ val rowIndexColName = if (rowIndexMetadataColumnSupported) {
+ s"${FileFormat.METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}"
+ } else {
+ ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+ }
+ val numRecordsPerFile = conf.numRows / conf.numFiles
+ val (skipCentileFirst, skipCentileMidLeft, skipCentileMidRight,
skipCentileLast) =
+ (0.2, 0.4, 0.6, 0.8)
+ val expectedRowIdxCol = "expected_rowIdx_col"
+ val df = spark
+ .range(0, conf.numRows, 1, conf.numFiles)
+ .toDF("id")
+ .withColumn("dummy_col", ($"id" / 55).cast("int"))
+ .withColumn(expectedRowIdxCol, ($"id" %
numRecordsPerFile).cast("int"))
+
+ // Add row index to schema if required.
+ val schemaWithRowIdx = if (rowIndexMetadataColumnSupported) {
+ df.schema
+ } else {
+ df.schema.add(rowIndexColName, LongType, nullable = true)
+ }
+
+ logInfo(s"gyytest schemaWithRowIndex $schemaWithRowIdx")
+
+ df.write
+ .format(conf.writeFormat)
+ .option(ParquetOutputFormat.BLOCK_SIZE, conf.rowGroupSize)
+ .option(ParquetOutputFormat.PAGE_SIZE, conf.pageSize)
+ .option(ParquetOutputFormat.DICTIONARY_PAGE_SIZE, conf.pageSize)
+ .save(path.getAbsolutePath)
+ val dfRead = spark.read
+ .format(conf.readFormat)
+ .schema(schemaWithRowIdx)
+ .load(path.getAbsolutePath)
+ // Verify that the produced files are laid out as expected.
+ if (conf.useSmallRowGroups) {
+ if (conf.useSmallPages) {
+ assertIntermediateRowGroups(path)
+ } else {
+ assertTinyRowGroups(path)
+ }
+ } else {
+ assertOneRowGroup(path)
+ }
+
+ val dfToAssert = if (conf.useFilter) {
+ // Add a filter such that we skip 60% of the records:
+ // [0%, 20%], [40%, 60%], [80%, 100%]
+ dfRead.filter(
+ ($"id" >= (skipCentileFirst * conf.numRows).toInt &&
+ $"id" < (skipCentileMidLeft * conf.numRows).toInt) ||
+ ($"id" >= (skipCentileMidRight * conf.numRows).toInt &&
+ $"id" < (skipCentileLast * conf.numRows).toInt))
+ } else {
+ dfRead
+ }
+
+ var numPartitions: Long = 0
+ var numOutputRows: Long = 0
+ dfToAssert.collect()
+ logInfo(dfToAssert.queryExecution.executedPlan.toString())
+ dfToAssert.queryExecution.executedPlan.foreach {
+ case a: BatchScanExec =>
+ numPartitions += a.inputRDD.partitions.length
+ numOutputRows += a.metrics("numOutputRows").value
+ case b: FileSourceScanExec =>
+ numPartitions += b.inputRDD.partitions.length
+ numOutputRows += b.metrics("numOutputRows").value
+ case c: BatchScanExecTransformer =>
+ numPartitions += c.inputRDD.partitions.length
+ numOutputRows += c.metrics("numOutputRows").value
+ case f: FileSourceScanExecTransformer =>
+ numPartitions += f.inputRDD.partitions.length
+ numOutputRows += f.metrics("numOutputRows").value
+ case _ =>
+ }
+ assert(numPartitions > 0)
+ assert(numOutputRows > 0)
+
+ if (conf.useSmallSplits) {
+ assert(numPartitions >= 2 * conf.numFiles)
+ }
+
+ // Assert that every rowIdx value matches the value in
`expectedRowIdx`.
+ assert(
+ dfToAssert
+ .filter(s"$rowIndexColName != $expectedRowIdxCol")
+ .count() == 0)
+
+ if (conf.useFilter) {
+ if (conf.useSmallRowGroups) {
+ assert(numOutputRows < conf.numRows)
+ }
+
+ val minMaxRowIndexes =
+ dfToAssert.select(max(col(rowIndexColName)),
min(col(rowIndexColName))).collect()
+ val (expectedMaxRowIdx, expectedMinRowIdx) = if (conf.numFiles
== 1) {
+ // When there is a single file, we still have row group
skipping,
+ // but that should not affect the produced rowIdx.
+ (conf.numRows * skipCentileLast - 1, conf.numRows *
skipCentileFirst)
+ } else {
+ // For simplicity, the chosen filter skips the whole files.
+ // Thus all unskipped files will have the same max and min
rowIdx values.
+ (numRecordsPerFile - 1, 0)
+ }
+ assert(minMaxRowIndexes(0).get(0) == expectedMaxRowIdx)
+ assert(minMaxRowIndexes(0).get(1) == expectedMinRowIdx)
+ if (!conf.useMultipleFiles) {
+ val skippedValues = List.range(0, (skipCentileFirst *
conf.numRows).toInt) ++
+ List.range(
+ (skipCentileMidLeft * conf.numRows).toInt,
+ (skipCentileMidRight * conf.numRows).toInt) ++
+ List.range((skipCentileLast * conf.numRows).toInt,
conf.numRows)
+ // rowIdx column should not have any of the `skippedValues`.
+ assert(
+ dfToAssert
+ .filter(col(rowIndexColName).isin(skippedValues: _*))
+ .count() == 0)
+ }
+ } else {
+ // assert(numOutputRows == conf.numRows)
+ // When there is no filter, the rowIdx values should be in range
+ // [0-`numRecordsPerFile`].
+ val expectedRowIdxValues = List.range(0, numRecordsPerFile)
+ assert(
+ dfToAssert
+ .filter(col(rowIndexColName).isin(expectedRowIdxValues: _*))
+ .count() == conf.numRows)
+ }
+ }
+ }
+ }
+ }
+ for (useDataSourceV2 <- Seq(true, false)) {
+ val conf = GlutenRowIndexTestConf(useDataSourceV2 = useDataSourceV2)
+
+ testGluten(s"invalid row index column type - ${conf.desc}") {
+ withSQLConf(conf.sqlConfs: _*) {
+ withTempPath {
+ path =>
+ val df = spark.range(0, 10, 1, 1).toDF("id")
+ val schemaWithRowIdx = df.schema
+ .add(ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME,
StringType)
+
+ df.write
+ .format(conf.writeFormat)
+ .save(path.getAbsolutePath)
+
+ val dfRead = spark.read
+ .format(conf.readFormat)
+ .schema(schemaWithRowIdx)
+ .load(path.getAbsolutePath)
-class GlutenParquetRowIndexSuite extends ParquetRowIndexSuite with
GlutenSQLTestsBaseTrait {}
+ val exception = intercept[Exception](dfRead.collect())
+
assert(exception.getMessage.contains(ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME))
+ }
+ }
+ }
+ }
+}
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 007381fe6..4db784782 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -42,6 +42,13 @@ abstract class BatchScanExecShim(
// Note: "metrics" is made transient to avoid sending driver-side metrics to
tasks.
@transient override lazy val metrics: Map[String, SQLMetric] = Map()
+ def metadataColumns: Seq[AttributeReference] = Seq.empty
+
+ def hasUnsupportedColumns: Boolean = {
+ // Below name has special meaning in Velox.
+ output.exists(a => a.name == "$path" || a.name == "$bucket")
+ }
+
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
throw new UnsupportedOperationException("Need to implement this method")
}
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index dcfb5c950..76556052c 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -46,6 +46,20 @@ abstract class BatchScanExecShim(
// Note: "metrics" is made transient to avoid sending driver-side metrics to
tasks.
@transient override lazy val metrics: Map[String, SQLMetric] = Map()
+ lazy val metadataColumns: Seq[AttributeReference] = output.collect {
+ case FileSourceMetadataAttribute(attr) => attr
+ }
+
+ def hasUnsupportedColumns: Boolean = {
+ // TODO, fallback if user define same name column due to we can't right now
+ // detect which column is metadata column which is user defined column.
+ val metadataColumnsNames = metadataColumns.map(_.name)
+ output
+ .filterNot(metadataColumns.toSet)
+ .exists(v => metadataColumnsNames.contains(v.name)) ||
+ output.exists(a => a.name == "$path" || a.name == "$bucket")
+ }
+
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
throw new UnsupportedOperationException("Need to implement this method")
}
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 4fc09f3ae..15455d51c 100644
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -66,7 +66,10 @@ abstract class FileSourceScanExecShim(
.filterNot(metadataColumns.toSet)
.exists(v => metadataColumnsNames.contains(v.name)) ||
// Below name has special meaning in Velox.
- output.exists(a => a.name == "$path" || a.name == "$bucket")
+ output.exists(
+ a =>
+ a.name == "$path" || a.name == "$bucket" ||
+ a.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
}
def isMetadataColumn(attr: Attribute): Boolean =
metadataColumns.contains(attr)
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 4c12356d6..ca9a7eb2d 100644
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -55,6 +56,25 @@ abstract class BatchScanExecShim(
// Note: "metrics" is made transient to avoid sending driver-side metrics to
tasks.
@transient override lazy val metrics: Map[String, SQLMetric] = Map()
+ lazy val metadataColumns: Seq[AttributeReference] = output.collect {
+ case FileSourceConstantMetadataAttribute(attr) => attr
+ case FileSourceGeneratedMetadataAttribute(attr) => attr
+ }
+
+ def hasUnsupportedColumns: Boolean = {
+ // TODO, fallback if user define same name column due to we can't right now
+ // detect which column is metadata column which is user defined column.
+ val metadataColumnsNames = metadataColumns.map(_.name)
+ metadataColumnsNames.contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
||
+ output
+ .filterNot(metadataColumns.toSet)
+ .exists(v => metadataColumnsNames.contains(v.name)) ||
+ output.exists(
+ a =>
+ a.name == "$path" || a.name == "$bucket" ||
+ a.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
+ }
+
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
throw new UnsupportedOperationException("Need to implement this method")
}
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index b3bb2d293..6295bcbc4 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -71,7 +71,10 @@ abstract class FileSourceScanExecShim(
output
.filterNot(metadataColumns.toSet)
.exists(v => metadataColumnsNames.contains(v.name)) ||
- output.exists(a => a.name == "$path" || a.name == "$bucket")
+ output.exists(
+ a =>
+ a.name == "$path" || a.name == "$bucket" ||
+ a.name == ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
}
def isMetadataColumn(attr: Attribute): Boolean =
metadataColumns.contains(attr)
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index bb3806097..47adf16fb 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -57,6 +58,25 @@ abstract class BatchScanExecShim(
// Note: "metrics" is made transient to avoid sending driver-side metrics to
tasks.
@transient override lazy val metrics: Map[String, SQLMetric] = Map()
+ lazy val metadataColumns: Seq[AttributeReference] = output.collect {
+ case FileSourceConstantMetadataAttribute(attr) => attr
+ case FileSourceGeneratedMetadataAttribute(attr, _) => attr
+ }
+
+ def hasUnsupportedColumns: Boolean = {
+ // TODO, fallback if user define same name column due to we can't right now
+ // detect which column is metadata column which is user defined column.
+ val metadataColumnsNames = metadataColumns.map(_.name)
+
metadataColumnsNames.contains(ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
||
+ output
+ .filterNot(metadataColumns.toSet)
+ .exists(v => metadataColumnsNames.contains(v.name)) ||
+ output.exists(
+ a =>
+ a.name == "$path" || a.name == "$bucket" ||
+ a.name == ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
+ }
+
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
throw new UnsupportedOperationException("Need to implement this method")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]