This is an automated email from the ASF dual-hosted git repository.

kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new a53ecc4a1 [VL] Enable GlutenParqutRowIndexSuite for Spark 3.4/3.5 
(#5740)
a53ecc4a1 is described below

commit a53ecc4a1261afe52c659a4333af20c687552f0c
Author: 高阳阳 <[email protected]>
AuthorDate: Wed May 15 14:35:20 2024 +0800

    [VL] Enable GlutenParqutRowIndexSuite for Spark 3.4/3.5 (#5740)
---
 .../execution/BatchScanExecTransformer.scala       |   5 +
 .../gluten/utils/velox/VeloxTestSettings.scala     |   8 +-
 .../parquet/GlutenParquetRowIndexSuite.scala       | 335 +++++++++++++++++++-
 .../gluten/utils/velox/VeloxTestSettings.scala     |   5 +-
 .../parquet/GlutenParquetRowIndexSuite.scala       | 342 ++++++++++++++++++++-
 .../datasources/v2/BatchScanExecShim.scala         |   7 +
 .../datasources/v2/BatchScanExecShim.scala         |  14 +
 .../sql/execution/FileSourceScanExecShim.scala     |   5 +-
 .../datasources/v2/BatchScanExecShim.scala         |  20 ++
 .../sql/execution/FileSourceScanExecShim.scala     |   5 +-
 .../datasources/v2/BatchScanExecShim.scala         |  20 ++
 11 files changed, 756 insertions(+), 10 deletions(-)

diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 3aeeffae1..b0c8c59e7 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -136,6 +136,11 @@ abstract class BatchScanExecTransformerBase(
     if (pushedAggregate.nonEmpty) {
       return ValidationResult.notOk(s"Unsupported aggregation push down for 
$scan.")
     }
+
+    if (hasUnsupportedColumns) {
+      return ValidationResult.notOk(s"Unsupported columns scan in native.")
+    }
+
     super.doValidateInternal()
   }
 
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 498ed5ef4..1afa203ab 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -30,7 +30,7 @@ import 
org.apache.spark.sql.execution.datasources.csv.{GlutenCSVLegacyTimeParser
 import 
org.apache.spark.sql.execution.datasources.exchange.GlutenValidateRequirementsSuite
 import 
org.apache.spark.sql.execution.datasources.json.{GlutenJsonLegacyTimeParserSuite,
 GlutenJsonV1Suite, GlutenJsonV2Suite}
 import 
org.apache.spark.sql.execution.datasources.orc.{GlutenOrcColumnarBatchReaderSuite,
 GlutenOrcFilterSuite, GlutenOrcPartitionDiscoverySuite, GlutenOrcSourceSuite, 
GlutenOrcV1FilterSuite, GlutenOrcV1PartitionDiscoverySuite, 
GlutenOrcV1QuerySuite, GlutenOrcV1SchemaPruningSuite, GlutenOrcV2QuerySuite, 
GlutenOrcV2SchemaPruningSuite}
-import 
org.apache.spark.sql.execution.datasources.parquet.{GlutenParquetColumnIndexSuite,
 GlutenParquetCompressionCodecPrecedenceSuite, 
GlutenParquetDeltaByteArrayEncodingSuite, GlutenParquetDeltaEncodingInteger, 
GlutenParquetDeltaEncodingLong, GlutenParquetDeltaLengthByteArrayEncodingSuite, 
GlutenParquetEncodingSuite, GlutenParquetFieldIdIOSuite, 
GlutenParquetFileFormatV1Suite, GlutenParquetFileFormatV2Suite, 
GlutenParquetInteroperabilitySuite, GlutenParquetIOSuite, GlutenParquetProtobu 
[...]
+import 
org.apache.spark.sql.execution.datasources.parquet.{GlutenParquetColumnIndexSuite,
 GlutenParquetCompressionCodecPrecedenceSuite, 
GlutenParquetDeltaByteArrayEncodingSuite, GlutenParquetDeltaEncodingInteger, 
GlutenParquetDeltaEncodingLong, GlutenParquetDeltaLengthByteArrayEncodingSuite, 
GlutenParquetEncodingSuite, GlutenParquetFieldIdIOSuite, 
GlutenParquetFileFormatV1Suite, GlutenParquetFileFormatV2Suite, 
GlutenParquetInteroperabilitySuite, GlutenParquetIOSuite, GlutenParquetProtobu 
[...]
 import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, 
GlutenTextV2Suite}
 import 
org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, 
GlutenFileTableSuite, GlutenV2PredicateSuite}
 import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
@@ -1189,9 +1189,9 @@ class VeloxTestSettings extends BackendTestSettings {
     // Row index metadata column support in Velox isn't ready yet, refer 
velox-9147
     .exclude("reading _tmp_metadata_row_index - not present in a table")
     .exclude("reading _tmp_metadata_row_index - present in a table")
-  // Row index metadata column support in Velox isn't ready yet, refer 
velox-9147
-  // enableSuite[GlutenParquetRowIndexSuite]
-
+  enableSuite[GlutenParquetRowIndexSuite]
+    .excludeByPrefix("row index generation")
+    .excludeByPrefix("invalid row index column type")
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
 // scalastyle:on line.size.limit
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
index acf6a2b63..6f153450c 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
@@ -16,6 +16,339 @@
  */
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.gluten.execution.{BatchScanExecTransformer, 
FileSourceScanExecTransformer}
+
 import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import 
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
+import org.apache.spark.sql.functions.{col, max, min}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{LongType, StringType}
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.column.ParquetProperties._
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+class GlutenParquetRowIndexSuite extends ParquetRowIndexSuite with 
GlutenSQLTestsBaseTrait {
+  import testImplicits._
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    sparkContext.setLogLevel("info")
+  }
+
+  private def readRowGroupRowCounts(path: String): Seq[Long] = {
+    ParquetFooterReader
+      .readFooter(
+        spark.sessionState.newHadoopConf(),
+        new Path(path),
+        ParquetMetadataConverter.NO_FILTER)
+      .getBlocks
+      .asScala
+      .map(_.getRowCount)
+  }
+
+  private def readRowGroupRowCounts(dir: File): Seq[Seq[Long]] = {
+    assert(dir.isDirectory)
+    dir
+      .listFiles()
+      .filter(f => f.isFile && f.getName.endsWith("parquet"))
+      .map(f => readRowGroupRowCounts(f.getAbsolutePath))
+  }
+
+  /** Do the files contain exactly one row group? */
+  private def assertOneRowGroup(dir: File): Unit = {
+    readRowGroupRowCounts(dir).foreach {
+      rcs => assert(rcs.length == 1, "expected one row group per file")
+    }
+  }
+
+  /**
+   * Do the files have a good layout to test row group skipping (both range 
metadata filter, and by
+   * using min/max).
+   */
+  private def assertTinyRowGroups(dir: File): Unit = {
+    readRowGroupRowCounts(dir).foreach {
+      rcs =>
+        assert(rcs.length > 1, "expected multiple row groups per file")
+        assert(rcs.last <= DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK)
+        assert(
+          rcs.reverse.tail.distinct == 
Seq(DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK),
+          "expected row groups with minimal row count")
+    }
+  }
+
+  /**
+   * Do the files have a good layout to test a combination of page skipping 
and row group skipping?
+   */
+  private def assertIntermediateRowGroups(dir: File): Unit = {
+    readRowGroupRowCounts(dir).foreach {
+      rcs =>
+        assert(rcs.length >= 3, "expected at least 3 row groups per file")
+        rcs.reverse.tail.foreach {
+          rc =>
+            assert(
+              rc > DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
+              "expected row groups larger than minimal row count")
+        }
+    }
+  }
+
+  case class GlutenRowIndexTestConf(
+      numRows: Long = 10000L,
+      useMultipleFiles: Boolean = false,
+      useVectorizedReader: Boolean = true,
+      useSmallPages: Boolean = false,
+      useSmallRowGroups: Boolean = false,
+      useSmallSplits: Boolean = false,
+      useFilter: Boolean = false,
+      useDataSourceV2: Boolean = false) {
+
+    val NUM_MULTIPLE_FILES = 4
+    // The test doesn't work correctly if the number of records per file is 
uneven.
+    assert(!useMultipleFiles || (numRows % NUM_MULTIPLE_FILES == 0))
+
+    def numFiles: Int = if (useMultipleFiles) { NUM_MULTIPLE_FILES }
+    else { 1 }
+
+    def rowGroupSize: Long = if (useSmallRowGroups) {
+      if (useSmallPages) {
+        // Each file will contain multiple row groups. All of them (except for 
the last one)
+        // will contain more than DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, so 
that individual
+        // pages within the row group can be skipped.
+        2048L
+      } else {
+        // Each file will contain multiple row groups. All of them (except for 
the last one)
+        // will contain exactly DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK records.
+        64L
+      }
+    } else {
+      // Each file will contain a single row group.
+      DEFAULT_BLOCK_SIZE
+    }
+
+    def pageSize: Long = if (useSmallPages) {
+      // Each page (except for the last one for each column) will contain 
exactly
+      // DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK records.
+      64L
+    } else {
+      DEFAULT_PAGE_SIZE
+    }
+
+    def writeFormat: String = "parquet"
+    def readFormat: String = if (useDataSourceV2) {
+      classOf[ParquetDataSourceV2].getCanonicalName
+    } else {
+      "parquet"
+    }
+
+    assert(useSmallRowGroups || !useSmallSplits)
+    def filesMaxPartitionBytes: Long = if (useSmallSplits) {
+      256L
+    } else {
+      SQLConf.FILES_MAX_PARTITION_BYTES.defaultValue.get
+    }
+
+    def desc: String = {
+      { if (useVectorizedReader) Seq("vectorized reader") else Seq("parquet-mr 
reader") } ++ {
+        if (useMultipleFiles) Seq("many files") else Seq.empty[String]
+      } ++ { if (useFilter) Seq("filtered") else Seq.empty[String] } ++ {
+        if (useSmallPages) Seq("small pages") else Seq.empty[String]
+      } ++ { if (useSmallRowGroups) Seq("small row groups") else 
Seq.empty[String] } ++ {
+        if (useSmallSplits) Seq("small splits") else Seq.empty[String]
+      } ++ { if (useDataSourceV2) Seq("datasource v2") else Seq.empty[String] }
+    }.mkString(", ")
+
+    def sqlConfs: Seq[(String, String)] = Seq(
+      // TODO: remove this change after customized parquet options as 
`block_size`, `page_size`
+      // been fully supported.
+      "spark.gluten.sql.native.writer.enabled" -> "false",
+      SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
useVectorizedReader.toString,
+      SQLConf.FILES_MAX_PARTITION_BYTES.key -> filesMaxPartitionBytes.toString
+    ) ++ { if (useDataSourceV2) Seq(SQLConf.USE_V1_SOURCE_LIST.key -> "") else 
Seq.empty }
+  }
+
+  for (useVectorizedReader <- Seq(true, false))
+    for (useDataSourceV2 <- Seq(true, false))
+      for (useSmallRowGroups <- Seq(true, false))
+        for (useSmallPages <- Seq(true, false))
+          for (useFilter <- Seq(true, false))
+            for (useSmallSplits <- Seq(useSmallRowGroups, false).distinct) {
+              val conf = GlutenRowIndexTestConf(
+                useVectorizedReader = useVectorizedReader,
+                useDataSourceV2 = useDataSourceV2,
+                useSmallRowGroups = useSmallRowGroups,
+                useSmallPages = useSmallPages,
+                useFilter = useFilter,
+                useSmallSplits = useSmallSplits
+              )
+              testRowIndexGeneration("row index generation", conf)
+            }
+
+  private def testRowIndexGeneration(label: String, conf: 
GlutenRowIndexTestConf): Unit = {
+    testGluten(s"$label - ${conf.desc}") {
+      withSQLConf(conf.sqlConfs: _*) {
+        withTempPath {
+          path =>
+            val rowIndexColName = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+            val numRecordsPerFile = conf.numRows / conf.numFiles
+            val (skipCentileFirst, skipCentileMidLeft, skipCentileMidRight, 
skipCentileLast) =
+              (0.2, 0.4, 0.6, 0.8)
+            val expectedRowIdxCol = "expected_rowIdx_col"
+            val df = spark
+              .range(0, conf.numRows, 1, conf.numFiles)
+              .toDF("id")
+              .withColumn("dummy_col", ($"id" / 55).cast("int"))
+              .withColumn(expectedRowIdxCol, ($"id" % 
numRecordsPerFile).cast("int"))
+
+            // With row index in schema.
+            val schemaWithRowIdx = df.schema.add(rowIndexColName, LongType, 
nullable = true)
+
+            df.write
+              .format(conf.writeFormat)
+              .option(ParquetOutputFormat.BLOCK_SIZE, conf.rowGroupSize)
+              .option(ParquetOutputFormat.PAGE_SIZE, conf.pageSize)
+              .option(ParquetOutputFormat.DICTIONARY_PAGE_SIZE, conf.pageSize)
+              .save(path.getAbsolutePath)
+            val dfRead = spark.read
+              .format(conf.readFormat)
+              .schema(schemaWithRowIdx)
+              .load(path.getAbsolutePath)
+
+            // Verify that the produced files are laid out as expected.
+            if (conf.useSmallRowGroups) {
+              if (conf.useSmallPages) {
+                assertIntermediateRowGroups(path)
+              } else {
+                assertTinyRowGroups(path)
+              }
+            } else {
+              assertOneRowGroup(path)
+            }
+
+            val dfToAssert = if (conf.useFilter) {
+              // Add a filter such that we skip 60% of the records:
+              // [0%, 20%], [40%, 60%], [80%, 100%]
+              dfRead.filter(
+                ($"id" >= (skipCentileFirst * conf.numRows).toInt &&
+                  $"id" < (skipCentileMidLeft * conf.numRows).toInt) ||
+                  ($"id" >= (skipCentileMidRight * conf.numRows).toInt &&
+                    $"id" < (skipCentileLast * conf.numRows).toInt))
+            } else {
+              dfRead
+            }
+
+            var numPartitions: Long = 0
+            var numOutputRows: Long = 0
+            dfToAssert.collect()
+            logInfo(dfToAssert.queryExecution.executedPlan.toString())
+            dfToAssert.queryExecution.executedPlan.foreach {
+              case a: BatchScanExec =>
+                numPartitions += a.inputRDD.partitions.length
+                numOutputRows += a.metrics("numOutputRows").value
+              case b: FileSourceScanExec =>
+                numPartitions += b.inputRDD.partitions.length
+                numOutputRows += b.metrics("numOutputRows").value
+              case c: BatchScanExecTransformer =>
+                numPartitions += c.inputRDD.partitions.length
+                numOutputRows += c.metrics("numOutputRows").value
+              case f: FileSourceScanExecTransformer =>
+                numPartitions += f.inputRDD.partitions.length
+                numOutputRows += f.metrics("numOutputRows").value
+              case _ =>
+            }
+            assert(numPartitions > 0)
+            assert(numOutputRows > 0)
+
+            if (conf.useSmallSplits) {
+              // SPARK-39634: Until the fix the fix for PARQUET-2161 is 
available is available,
+              // it is not possible to split Parquet files into multiple 
partitions while generating
+              // row indexes.
+              // assert(numPartitions >= 2 * conf.numFiles)
+            }
+
+            // Assert that every rowIdx value matches the value in 
`expectedRowIdx`.
+            assert(
+              dfToAssert
+                .filter(s"$rowIndexColName != $expectedRowIdxCol")
+                .count() == 0)
+
+            if (conf.useFilter) {
+              if (conf.useSmallRowGroups) {
+                assert(numOutputRows < conf.numRows)
+              }
+
+              val minMaxRowIndexes =
+                dfToAssert.select(max(col(rowIndexColName)), 
min(col(rowIndexColName))).collect()
+              val (expectedMaxRowIdx, expectedMinRowIdx) = if (conf.numFiles 
== 1) {
+                // When there is a single file, we still have row group 
skipping,
+                // but that should not affect the produced rowIdx.
+                (conf.numRows * skipCentileLast - 1, conf.numRows * 
skipCentileFirst)
+              } else {
+                // For simplicity, the chosen filter skips the whole files.
+                // Thus all unskipped files will have the same max and min 
rowIdx values.
+                (numRecordsPerFile - 1, 0)
+              }
+              assert(minMaxRowIndexes(0).get(0) == expectedMaxRowIdx)
+              assert(minMaxRowIndexes(0).get(1) == expectedMinRowIdx)
+              if (!conf.useMultipleFiles) {
+                val skippedValues = List.range(0, (skipCentileFirst * 
conf.numRows).toInt) ++
+                  List.range(
+                    (skipCentileMidLeft * conf.numRows).toInt,
+                    (skipCentileMidRight * conf.numRows).toInt) ++
+                  List.range((skipCentileLast * conf.numRows).toInt, 
conf.numRows)
+                // rowIdx column should not have any of the `skippedValues`.
+                assert(
+                  dfToAssert
+                    .filter(col(rowIndexColName).isin(skippedValues: _*))
+                    .count() == 0)
+              }
+            } else {
+              assert(numOutputRows == conf.numRows)
+              // When there is no filter, the rowIdx values should be in range
+              // [0-`numRecordsPerFile`].
+              val expectedRowIdxValues = List.range(0, numRecordsPerFile)
+              assert(
+                dfToAssert
+                  .filter(col(rowIndexColName).isin(expectedRowIdxValues: _*))
+                  .count() == conf.numRows)
+            }
+        }
+      }
+    }
+  }
+
+  for (useDataSourceV2 <- Seq(true, false)) {
+    val conf = RowIndexTestConf(useDataSourceV2 = useDataSourceV2)
+
+    testGluten(s"invalid row index column type - ${conf.desc}") {
+      withSQLConf(conf.sqlConfs: _*) {
+        withTempPath {
+          path =>
+            val df = spark.range(0, 10, 1, 1).toDF("id")
+            val schemaWithRowIdx = df.schema
+              .add(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, StringType)
+
+            df.write
+              .format(conf.writeFormat)
+              .save(path.getAbsolutePath)
+
+            val dfRead = spark.read
+              .format(conf.readFormat)
+              .schema(schemaWithRowIdx)
+              .load(path.getAbsolutePath)
 
-class GlutenParquetRowIndexSuite extends ParquetRowIndexSuite with 
GlutenSQLTestsBaseTrait {}
+            val exception = intercept[Exception](dfRead.collect())
+            
assert(exception.getMessage.contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME))
+        }
+      }
+    }
+  }
+}
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index a59819411..61353d99f 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -1198,8 +1198,9 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenResolveDefaultColumnsSuite]
   enableSuite[GlutenSubqueryHintPropagationSuite]
   enableSuite[GlutenUrlFunctionsSuite]
-  // Row index metadata column support in Velox isn't ready yet, refer 
velox-9147
-  // enableSuite[GlutenParquetRowIndexSuite]
+  enableSuite[GlutenParquetRowIndexSuite]
+    .excludeByPrefix("row index generation")
+    .excludeByPrefix("invalid row index column type")
   enableSuite[GlutenBitmapExpressionsQuerySuite]
   enableSuite[GlutenEmptyInSuite]
   enableSuite[GlutenRuntimeNullChecksV2Writes]
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
index acf6a2b63..abf21651f 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala
@@ -16,6 +16,346 @@
  */
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.gluten.execution.{BatchScanExecTransformer, 
FileSourceScanExecTransformer}
+
 import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import 
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
+import org.apache.spark.sql.functions.{col, max, min}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{LongType, StringType}
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.column.ParquetProperties._
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+class GlutenParquetRowIndexSuite extends ParquetRowIndexSuite with 
GlutenSQLTestsBaseTrait {
+  import testImplicits._
+
+  private def readRowGroupRowCounts(path: String): Seq[Long] = {
+    ParquetFooterReader
+      .readFooter(
+        spark.sessionState.newHadoopConf(),
+        new Path(path),
+        ParquetMetadataConverter.NO_FILTER)
+      .getBlocks
+      .asScala
+      .map(_.getRowCount)
+  }
+
+  private def readRowGroupRowCounts(dir: File): Seq[Seq[Long]] = {
+    assert(dir.isDirectory)
+    dir
+      .listFiles()
+      .filter(f => f.isFile && f.getName.endsWith("parquet"))
+      .map(f => readRowGroupRowCounts(f.getAbsolutePath))
+      .toSeq
+  }
+
+  /** Do the files contain exactly one row group? */
+  private def assertOneRowGroup(dir: File): Unit = {
+    readRowGroupRowCounts(dir).foreach {
+      rcs => assert(rcs.length == 1, "expected one row group per file")
+    }
+  }
+
+  /**
+   * Do the files have a good layout to test row group skipping (both range 
metadata filter, and by
+   * using min/max).
+   */
+  private def assertTinyRowGroups(dir: File): Unit = {
+    readRowGroupRowCounts(dir).foreach {
+      rcs =>
+        assert(rcs.length > 1, "expected multiple row groups per file")
+        assert(rcs.last <= DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK)
+        assert(
+          rcs.reverse.tail.distinct == 
Seq(DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK),
+          "expected row groups with minimal row count")
+    }
+  }
+
+  /**
+   * Do the files have a good layout to test a combination of page skipping 
and row group skipping?
+   */
+  private def assertIntermediateRowGroups(dir: File): Unit = {
+    readRowGroupRowCounts(dir).foreach {
+      rcs =>
+        assert(rcs.length >= 3, "expected at least 3 row groups per file")
+        rcs.reverse.tail.foreach {
+          rc =>
+            assert(
+              rc > DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
+              "expected row groups larger than minimal row count")
+        }
+    }
+  }
+
+  case class GlutenRowIndexTestConf(
+      numRows: Long = 10000L,
+      useMultipleFiles: Boolean = false,
+      useVectorizedReader: Boolean = true,
+      useSmallPages: Boolean = false,
+      useSmallRowGroups: Boolean = false,
+      useSmallSplits: Boolean = false,
+      useFilter: Boolean = false,
+      useDataSourceV2: Boolean = false) {
+
+    val NUM_MULTIPLE_FILES = 4
+    // The test doesn't work correctly if the number of records per file is 
uneven.
+    assert(!useMultipleFiles || (numRows % NUM_MULTIPLE_FILES == 0))
+
+    def numFiles: Int = if (useMultipleFiles) { NUM_MULTIPLE_FILES }
+    else { 1 }
+
+    def rowGroupSize: Long = if (useSmallRowGroups) {
+      if (useSmallPages) {
+        // Each file will contain multiple row groups. All of them (except for 
the last one)
+        // will contain more than DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, so 
that individual
+        // pages within the row group can be skipped.
+        2048L
+      } else {
+        // Each file will contain multiple row groups. All of them (except for 
the last one)
+        // will contain exactly DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK records.
+        64L
+      }
+    } else {
+      // Each file will contain a single row group.
+      DEFAULT_BLOCK_SIZE
+    }
+
+    def pageSize: Long = if (useSmallPages) {
+      // Each page (except for the last one for each column) will contain 
exactly
+      // DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK records.
+      64L
+    } else {
+      DEFAULT_PAGE_SIZE
+    }
+
+    def writeFormat: String = "parquet"
+    def readFormat: String = if (useDataSourceV2) {
+      classOf[ParquetDataSourceV2].getCanonicalName
+    } else {
+      "parquet"
+    }
+
+    assert(useSmallRowGroups || !useSmallSplits)
+    def filesMaxPartitionBytes: Long = if (useSmallSplits) {
+      256L
+    } else {
+      SQLConf.FILES_MAX_PARTITION_BYTES.defaultValue.get
+    }
+
+    def desc: String = {
+      { if (useVectorizedReader) Seq("vectorized reader") else Seq("parquet-mr 
reader") } ++ {
+        if (useMultipleFiles) Seq("many files") else Seq.empty[String]
+      } ++ { if (useFilter) Seq("filtered") else Seq.empty[String] } ++ {
+        if (useSmallPages) Seq("small pages") else Seq.empty[String]
+      } ++ { if (useSmallRowGroups) Seq("small row groups") else 
Seq.empty[String] } ++ {
+        if (useSmallSplits) Seq("small splits") else Seq.empty[String]
+      } ++ { if (useDataSourceV2) Seq("datasource v2") else Seq.empty[String] }
+    }.mkString(", ")
+
+    def sqlConfs: Seq[(String, String)] = Seq(
+      // TODO: remove this change after customized parquet options as 
`block_size`, `page_size`
+      // been fully supported.
+      "spark.gluten.sql.native.writer.enabled" -> "false",
+      SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
useVectorizedReader.toString,
+      SQLConf.FILES_MAX_PARTITION_BYTES.key -> filesMaxPartitionBytes.toString
+    ) ++ { if (useDataSourceV2) Seq(SQLConf.USE_V1_SOURCE_LIST.key -> "") else 
Seq.empty }
+  }
+
+  for (useVectorizedReader <- Seq(true, false))
+    for (useDataSourceV2 <- Seq(true, false))
+      for (useSmallRowGroups <- Seq(true, false))
+        for (useSmallPages <- Seq(true, false))
+          for (useFilter <- Seq(true, false))
+            for (useSmallSplits <- Seq(useSmallRowGroups, false).distinct) {
+              val conf = GlutenRowIndexTestConf(
+                useVectorizedReader = useVectorizedReader,
+                useDataSourceV2 = useDataSourceV2,
+                useSmallRowGroups = useSmallRowGroups,
+                useSmallPages = useSmallPages,
+                useFilter = useFilter,
+                useSmallSplits = useSmallSplits
+              )
+              testRowIndexGeneration("row index generation", conf)
+            }
+
+  private def testRowIndexGeneration(label: String, conf: 
GlutenRowIndexTestConf): Unit = {
+    testGluten(s"$label - ${conf.desc}") {
+      withSQLConf(conf.sqlConfs: _*) {
+        withTempPath {
+          path =>
+            // Read row index using _metadata.row_index if that is supported 
by the file format.
+            val rowIndexMetadataColumnSupported = conf.readFormat match {
+              case "parquet" => true
+              case _ => false
+            }
+            val rowIndexColName = if (rowIndexMetadataColumnSupported) {
+              s"${FileFormat.METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}"
+            } else {
+              ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+            }
+            val numRecordsPerFile = conf.numRows / conf.numFiles
+            val (skipCentileFirst, skipCentileMidLeft, skipCentileMidRight, 
skipCentileLast) =
+              (0.2, 0.4, 0.6, 0.8)
+            val expectedRowIdxCol = "expected_rowIdx_col"
+            val df = spark
+              .range(0, conf.numRows, 1, conf.numFiles)
+              .toDF("id")
+              .withColumn("dummy_col", ($"id" / 55).cast("int"))
+              .withColumn(expectedRowIdxCol, ($"id" % 
numRecordsPerFile).cast("int"))
+
+            // Add row index to schema if required.
+            val schemaWithRowIdx = if (rowIndexMetadataColumnSupported) {
+              df.schema
+            } else {
+              df.schema.add(rowIndexColName, LongType, nullable = true)
+            }
+
+            logInfo(s"gyytest schemaWithRowIndex $schemaWithRowIdx")
+
+            df.write
+              .format(conf.writeFormat)
+              .option(ParquetOutputFormat.BLOCK_SIZE, conf.rowGroupSize)
+              .option(ParquetOutputFormat.PAGE_SIZE, conf.pageSize)
+              .option(ParquetOutputFormat.DICTIONARY_PAGE_SIZE, conf.pageSize)
+              .save(path.getAbsolutePath)
+            val dfRead = spark.read
+              .format(conf.readFormat)
+              .schema(schemaWithRowIdx)
+              .load(path.getAbsolutePath)
+            // Verify that the produced files are laid out as expected.
+            if (conf.useSmallRowGroups) {
+              if (conf.useSmallPages) {
+                assertIntermediateRowGroups(path)
+              } else {
+                assertTinyRowGroups(path)
+              }
+            } else {
+              assertOneRowGroup(path)
+            }
+
+            val dfToAssert = if (conf.useFilter) {
+              // Add a filter such that we skip 60% of the records:
+              // [0%, 20%], [40%, 60%], [80%, 100%]
+              dfRead.filter(
+                ($"id" >= (skipCentileFirst * conf.numRows).toInt &&
+                  $"id" < (skipCentileMidLeft * conf.numRows).toInt) ||
+                  ($"id" >= (skipCentileMidRight * conf.numRows).toInt &&
+                    $"id" < (skipCentileLast * conf.numRows).toInt))
+            } else {
+              dfRead
+            }
+
+            var numPartitions: Long = 0
+            var numOutputRows: Long = 0
+            dfToAssert.collect()
+            logInfo(dfToAssert.queryExecution.executedPlan.toString())
+            dfToAssert.queryExecution.executedPlan.foreach {
+              case a: BatchScanExec =>
+                numPartitions += a.inputRDD.partitions.length
+                numOutputRows += a.metrics("numOutputRows").value
+              case b: FileSourceScanExec =>
+                numPartitions += b.inputRDD.partitions.length
+                numOutputRows += b.metrics("numOutputRows").value
+              case c: BatchScanExecTransformer =>
+                numPartitions += c.inputRDD.partitions.length
+                numOutputRows += c.metrics("numOutputRows").value
+              case f: FileSourceScanExecTransformer =>
+                numPartitions += f.inputRDD.partitions.length
+                numOutputRows += f.metrics("numOutputRows").value
+              case _ =>
+            }
+            assert(numPartitions > 0)
+            assert(numOutputRows > 0)
+
+            if (conf.useSmallSplits) {
+              assert(numPartitions >= 2 * conf.numFiles)
+            }
+
+            // Assert that every rowIdx value matches the value in 
`expectedRowIdx`.
+            assert(
+              dfToAssert
+                .filter(s"$rowIndexColName != $expectedRowIdxCol")
+                .count() == 0)
+
+            if (conf.useFilter) {
+              if (conf.useSmallRowGroups) {
+                assert(numOutputRows < conf.numRows)
+              }
+
+              val minMaxRowIndexes =
+                dfToAssert.select(max(col(rowIndexColName)), 
min(col(rowIndexColName))).collect()
+              val (expectedMaxRowIdx, expectedMinRowIdx) = if (conf.numFiles 
== 1) {
+                // When there is a single file, we still have row group 
skipping,
+                // but that should not affect the produced rowIdx.
+                (conf.numRows * skipCentileLast - 1, conf.numRows * 
skipCentileFirst)
+              } else {
+                // For simplicity, the chosen filter skips the whole files.
+                // Thus all unskipped files will have the same max and min 
rowIdx values.
+                (numRecordsPerFile - 1, 0)
+              }
+              assert(minMaxRowIndexes(0).get(0) == expectedMaxRowIdx)
+              assert(minMaxRowIndexes(0).get(1) == expectedMinRowIdx)
+              if (!conf.useMultipleFiles) {
+                val skippedValues = List.range(0, (skipCentileFirst * 
conf.numRows).toInt) ++
+                  List.range(
+                    (skipCentileMidLeft * conf.numRows).toInt,
+                    (skipCentileMidRight * conf.numRows).toInt) ++
+                  List.range((skipCentileLast * conf.numRows).toInt, 
conf.numRows)
+                // rowIdx column should not have any of the `skippedValues`.
+                assert(
+                  dfToAssert
+                    .filter(col(rowIndexColName).isin(skippedValues: _*))
+                    .count() == 0)
+              }
+            } else {
+              //            assert(numOutputRows == conf.numRows)
+              // When there is no filter, the rowIdx values should be in range
+              // [0-`numRecordsPerFile`].
+              val expectedRowIdxValues = List.range(0, numRecordsPerFile)
+              assert(
+                dfToAssert
+                  .filter(col(rowIndexColName).isin(expectedRowIdxValues: _*))
+                  .count() == conf.numRows)
+            }
+        }
+      }
+    }
+  }
+  for (useDataSourceV2 <- Seq(true, false)) {
+    val conf = GlutenRowIndexTestConf(useDataSourceV2 = useDataSourceV2)
+
+    testGluten(s"invalid row index column type - ${conf.desc}") {
+      withSQLConf(conf.sqlConfs: _*) {
+        withTempPath {
+          path =>
+            val df = spark.range(0, 10, 1, 1).toDF("id")
+            val schemaWithRowIdx = df.schema
+              .add(ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, 
StringType)
+
+            df.write
+              .format(conf.writeFormat)
+              .save(path.getAbsolutePath)
+
+            val dfRead = spark.read
+              .format(conf.readFormat)
+              .schema(schemaWithRowIdx)
+              .load(path.getAbsolutePath)
 
-class GlutenParquetRowIndexSuite extends ParquetRowIndexSuite with 
GlutenSQLTestsBaseTrait {}
+            val exception = intercept[Exception](dfRead.collect())
+            
assert(exception.getMessage.contains(ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME))
+        }
+      }
+    }
+  }
+}
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 007381fe6..4db784782 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -42,6 +42,13 @@ abstract class BatchScanExecShim(
   // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
   @transient override lazy val metrics: Map[String, SQLMetric] = Map()
 
+  def metadataColumns: Seq[AttributeReference] = Seq.empty
+
+  def hasUnsupportedColumns: Boolean = {
+    // Below name has special meaning in Velox.
+    output.exists(a => a.name == "$path" || a.name == "$bucket")
+  }
+
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
     throw new UnsupportedOperationException("Need to implement this method")
   }
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index dcfb5c950..76556052c 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -46,6 +46,20 @@ abstract class BatchScanExecShim(
   // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
   @transient override lazy val metrics: Map[String, SQLMetric] = Map()
 
+  lazy val metadataColumns: Seq[AttributeReference] = output.collect {
+    case FileSourceMetadataAttribute(attr) => attr
+  }
+
+  def hasUnsupportedColumns: Boolean = {
+    // TODO, fallback if user define same name column due to we can't right now
+    // detect which column is metadata column which is user defined column.
+    val metadataColumnsNames = metadataColumns.map(_.name)
+    output
+      .filterNot(metadataColumns.toSet)
+      .exists(v => metadataColumnsNames.contains(v.name)) ||
+    output.exists(a => a.name == "$path" || a.name == "$bucket")
+  }
+
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
     throw new UnsupportedOperationException("Need to implement this method")
   }
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 4fc09f3ae..15455d51c 100644
--- 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -66,7 +66,10 @@ abstract class FileSourceScanExecShim(
       .filterNot(metadataColumns.toSet)
       .exists(v => metadataColumnsNames.contains(v.name)) ||
     // Below name has special meaning in Velox.
-    output.exists(a => a.name == "$path" || a.name == "$bucket")
+    output.exists(
+      a =>
+        a.name == "$path" || a.name == "$bucket" ||
+          a.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
   }
 
   def isMetadataColumn(attr: Attribute): Boolean = 
metadataColumns.contains(attr)
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index 4c12356d6..ca9a7eb2d 100644
--- 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
 import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
Scan}
 import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
 import org.apache.spark.sql.execution.metric.SQLMetric
@@ -55,6 +56,25 @@ abstract class BatchScanExecShim(
   // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
   @transient override lazy val metrics: Map[String, SQLMetric] = Map()
 
+  lazy val metadataColumns: Seq[AttributeReference] = output.collect {
+    case FileSourceConstantMetadataAttribute(attr) => attr
+    case FileSourceGeneratedMetadataAttribute(attr) => attr
+  }
+
+  def hasUnsupportedColumns: Boolean = {
+    // TODO, fallback if user define same name column due to we can't right now
+    // detect which column is metadata column which is user defined column.
+    val metadataColumnsNames = metadataColumns.map(_.name)
+    metadataColumnsNames.contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME) 
||
+    output
+      .filterNot(metadataColumns.toSet)
+      .exists(v => metadataColumnsNames.contains(v.name)) ||
+    output.exists(
+      a =>
+        a.name == "$path" || a.name == "$bucket" ||
+          a.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
+  }
+
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
     throw new UnsupportedOperationException("Need to implement this method")
   }
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index b3bb2d293..6295bcbc4 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -71,7 +71,10 @@ abstract class FileSourceScanExecShim(
     output
       .filterNot(metadataColumns.toSet)
       .exists(v => metadataColumnsNames.contains(v.name)) ||
-    output.exists(a => a.name == "$path" || a.name == "$bucket")
+    output.exists(
+      a =>
+        a.name == "$path" || a.name == "$bucket" ||
+          a.name == ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
   }
 
   def isMetadataColumn(attr: Attribute): Boolean = 
metadataColumns.contains(attr)
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index bb3806097..47adf16fb 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
 import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
Scan}
 import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
 import org.apache.spark.sql.execution.metric.SQLMetric
@@ -57,6 +58,25 @@ abstract class BatchScanExecShim(
   // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
   @transient override lazy val metrics: Map[String, SQLMetric] = Map()
 
+  lazy val metadataColumns: Seq[AttributeReference] = output.collect {
+    case FileSourceConstantMetadataAttribute(attr) => attr
+    case FileSourceGeneratedMetadataAttribute(attr, _) => attr
+  }
+
+  def hasUnsupportedColumns: Boolean = {
+    // TODO, fallback if user define same name column due to we can't right now
+    // detect which column is metadata column which is user defined column.
+    val metadataColumnsNames = metadataColumns.map(_.name)
+    
metadataColumnsNames.contains(ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
 ||
+    output
+      .filterNot(metadataColumns.toSet)
+      .exists(v => metadataColumnsNames.contains(v.name)) ||
+    output.exists(
+      a =>
+        a.name == "$path" || a.name == "$bucket" ||
+          a.name == ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
+  }
+
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
     throw new UnsupportedOperationException("Need to implement this method")
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to