This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 50b652e241f [SPARK-43226] Define extractors for file-constant metadata
50b652e241f is described below

commit 50b652e241f7e31b99303359ec53e26a8989a4f0
Author: Ryan Johnson <ryan.john...@databricks.com>
AuthorDate: Tue Apr 25 16:11:21 2023 +0800

    [SPARK-43226] Define extractors for file-constant metadata
    
    ### What changes were proposed in this pull request?
    
    File-source constant metadata columns are often derived indirectly from 
file-level metadata values rather than exposing those values directly. Add 
support for metadata extractors, so that we can express such columns in a 
generic way.
    
    ### Why are the changes needed?
    
    Allows to express the existing file-source metadata columns in a generic 
way (previously hard-wired), and also allows to lazily derive expensive 
metadata values only if the query actually selects them.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test. Plus, existing file-source metadata unit tests pass.
    
    Closes #40885 from 
ryan-johnson-databricks/file-constant-metadata-extractors.
    
    Authored-by: Ryan Johnson <ryan.john...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/execution/DataSourceScanExec.scala   |  33 +++---
 .../sql/execution/datasources/FileFormat.scala     | 124 ++++++++++++++-------
 .../sql/execution/datasources/FileScanRDD.scala    |  70 +++---------
 .../datasources/PartitioningAwareFileIndex.scala   |  10 +-
 .../FileSourceCustomMetadataStructSuite.scala      |  35 +++++-
 5 files changed, 156 insertions(+), 116 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 0400a2b6abc..0d5091f4a97 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -548,10 +548,9 @@ case class FileSourceScanExec(
         hadoopConf = 
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
 
     val readRDD = if (bucketedScan) {
-      createBucketedReadRDD(relation.bucketSpec.get, readFile, 
dynamicallySelectedPartitions,
-        relation)
+      createBucketedReadRDD(relation.bucketSpec.get, readFile, 
dynamicallySelectedPartitions)
     } else {
-      createReadRDD(readFile, dynamicallySelectedPartitions, relation)
+      createReadRDD(readFile, dynamicallySelectedPartitions)
     }
     sendDriverMetrics()
     readRDD
@@ -617,13 +616,11 @@ case class FileSourceScanExec(
    * @param bucketSpec the bucketing spec.
    * @param readFile a function to read each (part of a) file.
    * @param selectedPartitions Hive-style partition that are part of the read.
-   * @param fsRelation [[HadoopFsRelation]] associated with the read.
    */
   private def createBucketedReadRDD(
       bucketSpec: BucketSpec,
       readFile: (PartitionedFile) => Iterator[InternalRow],
-      selectedPartitions: Array[PartitionDirectory],
-      fsRelation: HadoopFsRelation): RDD[InternalRow] = {
+      selectedPartitions: Array[PartitionDirectory]): RDD[InternalRow] = {
     logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
     val filesGroupedToBuckets =
       selectedPartitions.flatMap { p =>
@@ -660,9 +657,10 @@ case class FileSourceScanExec(
       }
     }
 
-    new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions,
-      new StructType(requiredSchema.fields ++ 
fsRelation.partitionSchema.fields),
-      fileConstantMetadataColumns, new 
FileSourceOptions(CaseInsensitiveMap(relation.options)))
+    new FileScanRDD(relation.sparkSession, readFile, filePartitions,
+      new StructType(requiredSchema.fields ++ relation.partitionSchema.fields),
+      fileConstantMetadataColumns, 
relation.fileFormat.fileConstantMetadataExtractors,
+      new FileSourceOptions(CaseInsensitiveMap(relation.options)))
   }
 
   /**
@@ -671,20 +669,18 @@ case class FileSourceScanExec(
    *
    * @param readFile a function to read each (part of a) file.
    * @param selectedPartitions Hive-style partition that are part of the read.
-   * @param fsRelation [[HadoopFsRelation]] associated with the read.
    */
   private def createReadRDD(
       readFile: (PartitionedFile) => Iterator[InternalRow],
-      selectedPartitions: Array[PartitionDirectory],
-      fsRelation: HadoopFsRelation): RDD[InternalRow] = {
-    val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+      selectedPartitions: Array[PartitionDirectory]): RDD[InternalRow] = {
+    val openCostInBytes = 
relation.sparkSession.sessionState.conf.filesOpenCostInBytes
     val maxSplitBytes =
-      FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
+      FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
     logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
     // Filter files with bucket pruning if possible
-    val bucketingEnabled = 
fsRelation.sparkSession.sessionState.conf.bucketingEnabled
+    val bucketingEnabled = 
relation.sparkSession.sessionState.conf.bucketingEnabled
     val shouldProcess: Path => Boolean = optionalBucketSet match {
       case Some(bucketSet) if bucketingEnabled =>
         // Do not prune the file if bucket file name is invalid
@@ -722,9 +718,10 @@ case class FileSourceScanExec(
     val partitions =
       FilePartition.getFilePartitions(relation.sparkSession, splitFiles, 
maxSplitBytes)
 
-    new FileScanRDD(fsRelation.sparkSession, readFile, partitions,
-      new StructType(requiredSchema.fields ++ 
fsRelation.partitionSchema.fields),
-      fileConstantMetadataColumns, new 
FileSourceOptions(CaseInsensitiveMap(relation.options)))
+    new FileScanRDD(relation.sparkSession, readFile, partitions,
+      new StructType(requiredSchema.fields ++ relation.partitionSchema.fields),
+      fileConstantMetadataColumns, 
relation.fileFormat.fileConstantMetadataExtractors,
+      new FileSourceOptions(CaseInsensitiveMap(relation.options)))
   }
 
   // Filters unused DynamicPruningExpression expressions - one which has been 
replaced
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index 72e3e1e2406..8ec1d3bb8c9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs._
 import org.apache.hadoop.io.compress.{CompressionCodecFactory, 
SplittableCompressionCodec}
 import org.apache.hadoop.mapreduce.Job
 
+import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -30,7 +31,6 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
 
 
 /**
@@ -190,19 +190,38 @@ trait FileFormat {
   /**
    * All fields the file format's _metadata struct defines.
    *
-   * Each field's metadata should define [[METADATA_COL_ATTR_KEY]],
-   * [[FILE_SOURCE_METADATA_COL_ATTR_KEY]], and either
-   * [[FILE_SOURCE_CONSTANT_METADATA_COL_ATTR_KEY]] or
-   * [[FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY]] as appropriate.
+   * Each metadata struct field is either "constant" or "generated" 
(respectively defined/matched by
+   * [[FileSourceConstantMetadataStructField]] or 
[[FileSourceGeneratedMetadataAttribute]]).
    *
-   * Constant attributes will be extracted automatically from
-   * [[PartitionedFile.extraConstantMetadataColumnValues]], while generated 
metadata columns always
-   * map to some hidden/internal column the underslying reader provides.
+   * Constant metadata columns are derived from the [[PartitionedFile]] 
instances a scan's
+   * [[FileIndex]] provides. Thus, a custom [[FileFormat]] that defines 
constant metadata columns
+   * will generally pair with a a custom [[FileIndex]] that populates 
[[PartitionedFile]] with
+   * appropriate metadata values. By default, constant attribute values are 
obtained by a simple
+   * name-based lookup in 
[[PartitionedFile.extraConstantMetadataColumnValues]], but implementations
+   * can override [[fileConstantMetadataExtractors]] to define custom 
extractors that have access to
+   * the entire [[PartitionedFile]] when deriving the column's value.
    *
-   * NOTE: It is not possible to change the semantics of the base metadata 
fields by overriding this
-   * method. Technically, a file format could choose suppress them, but that 
is not recommended.
+   * Generated metadata columns map to a hidden/internal column the underlying 
reader provides, and
+   * so will often pair with a custom reader that can populate those columns. 
For example,
+   * [[ParquetFileFormat]] defines a "_metadata.row_index" column that relies 
on
+   * [[VectorizedParquetRecordReader]] to extract the actual row index values 
from the parquet scan.
    */
   def metadataSchemaFields: Seq[StructField] = FileFormat.BASE_METADATA_FIELDS
+
+  /**
+   * The extractors to use when deriving file-constant metadata columns for 
this file format.
+   *
+   * Implementations that define custom constant metadata columns can override 
this method to
+   * associate a custom extractor with a given metadata column name, when a 
simple name-based lookup
+   * in [[PartitionedFile.extraConstantMetadataColumnValues]] is not 
expressive enough; extractors
+   * have access to the entire [[PartitionedFile]] and can perform arbitrary 
computations.
+   *
+   * NOTE: Extractors are lazy, invoked only if the query actually selects 
their column at runtime.
+   *
+   * See also [[FileFormat.getFileConstantMetadataColumnValue]].
+   */
+  def fileConstantMetadataExtractors: Map[String, PartitionedFile => Any] =
+    FileFormat.BASE_METADATA_EXTRACTORS
 }
 
 object FileFormat {
@@ -241,47 +260,74 @@ object FileFormat {
     FileSourceConstantMetadataStructField(FILE_BLOCK_LENGTH, LongType, 
nullable = false),
     FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, 
TimestampType, nullable = false))
 
+  /**
+   * All [[BASE_METADATA_FIELDS]] require custom extractors because they are 
derived directly from
+   * fields of the [[PartitionedFile]], and do have entries in the file's 
metadata map.
+   */
+  val BASE_METADATA_EXTRACTORS: Map[String, PartitionedFile => Any] = Map(
+    FILE_PATH -> { pf: PartitionedFile => pf.toPath.toString },
+    FILE_NAME -> { pf: PartitionedFile => pf.toPath.getName },
+    FILE_SIZE -> { pf: PartitionedFile => pf.fileSize },
+    FILE_BLOCK_START -> { pf: PartitionedFile => pf.start },
+    FILE_BLOCK_LENGTH -> { pf: PartitionedFile => pf.length },
+    // The modificationTime from the file has millisecond granularity, but the 
TimestampType for
+    // `file_modification_time` has microsecond granularity.
+    FILE_MODIFICATION_TIME -> { pf: PartitionedFile => pf.modificationTime * 
1000 }
+  )
+
+  /**
+   * Extracts the [[Literal]] value of a file-constant metadata column from a 
[[PartitionedFile]].
+   *
+   * If an extractor is available, apply it. Otherwise, look up the column's 
name in the file's
+   * column value map and return the result (or null, if not found).
+   *
+   * Raw values (including null) are automatically converted to literals as a 
courtesy.
+   */
+  def getFileConstantMetadataColumnValue(
+      name: String,
+      file: PartitionedFile,
+      metadataExtractors: Map[String, PartitionedFile => Any]): Literal = {
+    val extractor = metadataExtractors.get(name).getOrElse {
+      pf: PartitionedFile => 
pf.otherConstantMetadataColumnValues.get(name).orNull
+    }
+    Literal(extractor.apply(file))
+  }
+
   // create an internal row given required metadata fields and file information
   def createMetadataInternalRow(
+      partitionValues: InternalRow,
       fieldNames: Seq[String],
-      filePath: Path,
+      filePath: SparkPath,
       fileSize: Long,
       fileModificationTime: Long): InternalRow = {
-    // We are not aware of `FILE_BLOCK_START` and `FILE_BLOCK_LENGTH` before 
splitting files
-    assert(!fieldNames.contains(FILE_BLOCK_START) && 
!fieldNames.contains(FILE_BLOCK_LENGTH))
-    updateMetadataInternalRow(new GenericInternalRow(fieldNames.length), 
fieldNames,
-      filePath, fileSize, 0L, fileSize, fileModificationTime, Map.empty)
+    // When scanning files directly from the filesystem, we only support 
file-constant metadata
+    // fields whose values can be derived from a file status. In particular, 
we don't have accurate
+    // file split information yet, nor do we have a way to provide custom 
metadata column values.
+    val validFieldNames = Set(FILE_PATH, FILE_NAME, FILE_SIZE, 
FILE_MODIFICATION_TIME)
+    val extractors = 
FileFormat.BASE_METADATA_EXTRACTORS.filterKeys(validFieldNames.contains).toMap
+    assert(fieldNames.forall(validFieldNames.contains))
+    val pf = PartitionedFile(
+      partitionValues = partitionValues,
+      filePath = filePath,
+      start = 0L,
+      length = fileSize,
+      locations = Array.empty,
+      modificationTime = fileModificationTime,
+      fileSize = fileSize,
+      otherConstantMetadataColumnValues = Map.empty)
+    updateMetadataInternalRow(new GenericInternalRow(fieldNames.length), 
fieldNames, pf, extractors)
   }
 
   // update an internal row given required metadata fields and file information
   def updateMetadataInternalRow(
       row: InternalRow,
       fieldNames: Seq[String],
-      filePath: Path,
-      fileSize: Long,
-      fileBlockStart: Long,
-      fileBlockLength: Long,
-      fileModificationTime: Long,
-      otherConstantMetadataColumnValues: Map[String, Any]): InternalRow = {
+      file: PartitionedFile,
+      metadataExtractors: Map[String, PartitionedFile => Any]): InternalRow = {
     fieldNames.zipWithIndex.foreach { case (name, i) =>
-      name match {
-        // NOTE: The base metadata fields are hard-wired here and cannot be 
overridden.
-        case FILE_PATH => row.update(i, 
UTF8String.fromString(filePath.toString))
-        case FILE_NAME => row.update(i, 
UTF8String.fromString(filePath.getName))
-        case FILE_SIZE => row.update(i, fileSize)
-        case FILE_BLOCK_START => row.update(i, fileBlockStart)
-        case FILE_BLOCK_LENGTH => row.update(i, fileBlockLength)
-        case FILE_MODIFICATION_TIME =>
-          // the modificationTime from the file is in millisecond,
-          // while internally, the TimestampType `file_modification_time` is 
stored in microsecond
-          row.update(i, fileModificationTime * 1000L)
-        case other =>
-          // Other metadata columns use the file-provided value (if any). 
Automatically convert raw
-          // values (including nulls) to literals as a courtesy.
-          Literal(otherConstantMetadataColumnValues.get(other).orNull) match {
-            case Literal(null, _) => row.setNullAt(i)
-            case literal => row.update(i, literal.value)
-          }
+      getFileConstantMetadataColumnValue(name, file, metadataExtractors) match 
{
+        case Literal(null, _) => row.setNullAt(i)
+        case literal => row.update(i, literal.value)
       }
     }
     row
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 706228e656b..0cca51cf4e3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -31,13 +31,13 @@ import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
GenericInternalRow, JoinedRow, Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.types.PhysicalDataType
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.FileFormat._
 import org.apache.spark.sql.execution.vectorized.{ColumnVectorUtils, 
ConstantColumnVector}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
-import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.NextIterator
 
 /**
@@ -81,6 +81,7 @@ class FileScanRDD(
     @transient val filePartitions: Seq[FilePartition],
     val readSchema: StructType,
     val metadataColumns: Seq[AttributeReference] = Seq.empty,
+    metadataExtractors: Map[String, PartitionedFile => Any] = Map.empty,
     options: FileSourceOptions = new 
FileSourceOptions(CaseInsensitiveMap(Map.empty)))
   extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
 
@@ -149,65 +150,28 @@ class FileScanRDD(
        */
       private def updateMetadataRow(): Unit =
         if (metadataColumns.nonEmpty && currentFile != null) {
-          updateMetadataInternalRow(metadataRow, metadataColumns.map(_.name),
-            currentFile.toPath, currentFile.fileSize, currentFile.start, 
currentFile.length,
-            currentFile.modificationTime, 
currentFile.otherConstantMetadataColumnValues)
+          updateMetadataInternalRow(
+            metadataRow, metadataColumns.map(_.name), currentFile, 
metadataExtractors)
         }
 
       /**
        * Create an array of constant column vectors containing all required 
metadata columns
        */
       private def createMetadataColumnVector(c: ColumnarBatch): 
Array[ColumnVector] = {
-        val path = currentFile.toPath
-        lazy val tmpRow = new GenericInternalRow(1) // for populating custom 
metadata fields
-        metadataColumns.map(a => (a.name, a.dataType)).map {
-          case (FILE_PATH, dataType) =>
-            require(dataType == StringType)
-            val columnVector = new ConstantColumnVector(c.numRows(), 
StringType)
-            columnVector.setUtf8String(UTF8String.fromString(path.toString))
-            columnVector
-          case (FILE_NAME, dataType) =>
-            require(dataType == StringType)
-            val columnVector = new ConstantColumnVector(c.numRows(), 
StringType)
-            columnVector.setUtf8String(UTF8String.fromString(path.getName))
-            columnVector
-          case (FILE_SIZE, dataType) =>
-            require(dataType == LongType)
-            val columnVector = new ConstantColumnVector(c.numRows(), LongType)
-            columnVector.setLong(currentFile.fileSize)
-            columnVector
-          case (FILE_BLOCK_START, dataType) =>
-            require(dataType == LongType)
-            val columnVector = new ConstantColumnVector(c.numRows(), LongType)
-            columnVector.setLong(currentFile.start)
-            columnVector
-          case (FILE_BLOCK_LENGTH, dataType) =>
-            require(dataType == LongType)
-            val columnVector = new ConstantColumnVector(c.numRows(), LongType)
-            columnVector.setLong(currentFile.length)
-            columnVector
-          case (FILE_MODIFICATION_TIME, dataType) =>
-            require(dataType == TimestampType)
-            val columnVector = new ConstantColumnVector(c.numRows(), LongType)
-            // the modificationTime from the file is in millisecond,
-            // while internally, the TimestampType is stored in microsecond
-            columnVector.setLong(currentFile.modificationTime * 1000L)
-            columnVector
-          case (other, dataType: DataType) =>
-            // Other metadata columns use the file-provided value, if one 
exists. Automatically
-            // convert raw values (including nulls) to literals as a courtesy, 
then populate the
-            // column by passing the resulting value through the `tmpRow` we 
allocated above.
-            
Literal(currentFile.otherConstantMetadataColumnValues.get(other).orNull) match {
-              case Literal(null, _) =>
-                tmpRow.setNullAt(0)
-              case literal =>
-                require(dataType == literal.dataType)
-                tmpRow.update(0, literal.value)
-            }
+        val tmpRow = new GenericInternalRow(1)
+        metadataColumns.map { attr =>
+          // Populate each metadata column by passing the resulting value 
through `tmpRow`.
+          getFileConstantMetadataColumnValue(attr.name, currentFile, 
metadataExtractors) match {
+            case Literal(null, _) =>
+              tmpRow.setNullAt(0)
+            case literal =>
+              require(PhysicalDataType(attr.dataType) == 
PhysicalDataType(literal.dataType))
+              tmpRow.update(0, literal.value)
+          }
 
-            val columnVector = new ConstantColumnVector(c.numRows(), dataType)
-            ColumnVectorUtils.populate(columnVector, tmpRow, 0)
-            columnVector
+          val columnVector = new ConstantColumnVector(c.numRows(), 
attr.dataType)
+          ColumnVectorUtils.populate(columnVector, tmpRow, 0)
+          columnVector
         }.toArray
       }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 2aa5d30946c..b25162aad9a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -102,19 +102,19 @@ abstract class PartitioningAwareFileIndex(
       })
     }
 
-    def matchFileMetadataPredicate(f: FileStatus): Boolean = {
+    def matchFileMetadataPredicate(partitionValues: InternalRow, f: 
FileStatus): Boolean = {
       // use option.forall, so if there is no filter no metadata struct, 
return true
       boundedFilterMetadataStructOpt.forall { boundedFilter =>
         val row =
-          createMetadataInternalRow(requiredMetadataColumnNames.toSeq,
-            f.getPath, f.getLen, f.getModificationTime)
+          createMetadataInternalRow(partitionValues, 
requiredMetadataColumnNames.toSeq,
+            SparkPath.fromFileStatus(f), f.getLen, f.getModificationTime)
         boundedFilter.eval(row)
       }
     }
 
     val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
       PartitionDirectory(InternalRow.empty, allFiles().toArray
-        .filter(f => isNonEmptyFile(f) && matchFileMetadataPredicate(f))) :: 
Nil
+        .filter(f => isNonEmptyFile(f) && 
matchFileMetadataPredicate(InternalRow.empty, f))) :: Nil
     } else {
       if (recursiveFileLookup) {
         throw new IllegalArgumentException(
@@ -126,7 +126,7 @@ abstract class PartitioningAwareFileIndex(
             case Some(existingDir) =>
               // Directory has children files in it, return them
               existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f) 
&&
-                matchFileMetadataPredicate(f))
+                matchFileMetadataPredicate(values, f))
 
             case None =>
               // Directory does not exist, or has no children files
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
index 210535ba62b..05872d41131 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
@@ -134,6 +134,39 @@ class FileSourceCustomMetadataStructSuite extends 
QueryTest with SharedSparkSess
     }
   }
 
+  test("[SPARK-43226] extra constant metadata fields with extractors") {
+    withTempData("parquet", FILE_SCHEMA) { (_, f0, f1) =>
+      val format = new TestFileFormat(extraConstantMetadataFields) {
+        val extractPartitionNumber = { pf: PartitionedFile =>
+          pf.toPath.toString.split("/").collectFirst {
+            case "f0" => 9990
+            case "f1" => 9991
+          }.get
+        }
+        val extractPartitionName = { pf: PartitionedFile =>
+          pf.toPath.toString.split("/").collectFirst {
+            case "f0" => "f0f"
+            case "f1" => "f1f"
+          }.get
+        }
+        override def fileConstantMetadataExtractors: Map[String, 
PartitionedFile => Any] = {
+          super.fileConstantMetadataExtractors ++ Map(
+            "foo" -> extractPartitionNumber, "bar" -> extractPartitionName)
+        }
+      }
+      val files = Seq(FileStatusWithMetadata(f0), FileStatusWithMetadata(f1))
+      val df = createDF(format, files)
+
+      checkAnswer(
+        df.select("fileNum", "x", "_metadata.row_index", "_metadata.foo", 
"_metadata.bar"),
+        Seq(
+          Row(0, 101L, 0L, 9990, "f0f"),
+          Row(0, 102L, 1L, 9990, "f0f"),
+          Row(1, 111L, 0L, 9991, "f1f"),
+          Row(1, 112L, 1L, 9991, "f1f")))
+    }
+  }
+
   test("filters and projections on extra constant metadata fields") {
     withTempData("parquet", FILE_SCHEMA) { (_, f0, f1) =>
       val format = new TestFileFormat(extraConstantMetadataFields)
@@ -302,7 +335,7 @@ class FileSourceCustomMetadataStructSuite extends QueryTest 
with SharedSparkSess
     }
   }
 
-  test("cannot override base metadata fields") {
+  test("generated columns and extractors take precedence over metadata map 
values") {
     withTempData("parquet", FILE_SCHEMA) { (_, f0, f1) =>
       import FileFormat.{FILE_NAME, FILE_SIZE}
       import ParquetFileFormat.ROW_INDEX


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to