This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new e6e99f016b [GH-2651] Add _metadata hidden column support for 
GeoPackage DataSource V2 reader (#2654)
e6e99f016b is described below

commit e6e99f016b4bbf2c435a154ac53e9dad31f379b3
Author: Jia Yu <[email protected]>
AuthorDate: Sun Feb 15 00:57:55 2026 -0700

    [GH-2651] Add _metadata hidden column support for GeoPackage DataSource V2 
reader (#2654)
---
 .../GeoPackagePartitionReaderFactory.scala         | 55 +++++++++++++++-
 .../datasources/geopackage/GeoPackageScan.scala    | 11 +++-
 .../geopackage/GeoPackageScanBuilder.scala         | 13 ++++
 .../datasources/geopackage/GeoPackageTable.scala   | 26 +++++++-
 .../apache/sedona/sql/GeoPackageReaderTest.scala   | 74 +++++++++++++++++++++-
 .../GeoPackagePartitionReaderFactory.scala         | 55 +++++++++++++++-
 .../datasources/geopackage/GeoPackageScan.scala    | 11 +++-
 .../geopackage/GeoPackageScanBuilder.scala         | 13 ++++
 .../datasources/geopackage/GeoPackageTable.scala   | 25 +++++++-
 .../apache/sedona/sql/GeoPackageReaderTest.scala   | 74 +++++++++++++++++++++-
 .../GeoPackagePartitionReaderFactory.scala         | 55 +++++++++++++++-
 .../datasources/geopackage/GeoPackageScan.scala    | 11 +++-
 .../geopackage/GeoPackageScanBuilder.scala         | 13 ++++
 .../datasources/geopackage/GeoPackageTable.scala   | 25 +++++++-
 .../apache/sedona/sql/GeoPackageReaderTest.scala   | 74 +++++++++++++++++++++-
 .../GeoPackagePartitionReaderFactory.scala         | 55 +++++++++++++++-
 .../datasources/geopackage/GeoPackageScan.scala    | 11 +++-
 .../geopackage/GeoPackageScanBuilder.scala         | 13 ++++
 .../datasources/geopackage/GeoPackageTable.scala   | 25 +++++++-
 .../apache/sedona/sql/GeoPackageReaderTest.scala   | 74 +++++++++++++++++++++-
 20 files changed, 689 insertions(+), 24 deletions(-)

diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
index b1d38996b0..0f2e9a87b8 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
@@ -25,16 +25,20 @@ import 
org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Ge
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
 import org.apache.spark.sql.execution.datasources.FilePartition
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.SerializableConfiguration
 
 case class GeoPackagePartitionReaderFactory(
     sparkSession: SparkSession,
     broadcastedConf: Broadcast[SerializableConfiguration],
     loadOptions: GeoPackageOptions,
-    dataSchema: StructType)
+    dataSchema: StructType,
+    metadataSchema: StructType)
     extends PartitionReaderFactory {
 
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
@@ -72,7 +76,7 @@ case class GeoPackagePartitionReaderFactory(
       case _ => None
     }
 
-    GeoPackagePartitionReader(
+    val baseReader = GeoPackagePartitionReader(
       rs = rs,
       options = GeoPackageReadOptions(
         tableName = loadOptions.tableName,
@@ -84,5 +88,52 @@ case class GeoPackagePartitionReaderFactory(
       broadcastedConf = broadcastedConf,
       currentTempFile = tempFile,
       copying = copied)
+
+    if (metadataSchema.nonEmpty) {
+      val gpkgFile = partitionFiles.head
+      val filePath = gpkgFile.filePath.toString
+      val fileName = new Path(filePath).getName
+
+      val allMetadataValues: Map[String, Any] = Map(
+        "file_path" -> UTF8String.fromString(filePath),
+        "file_name" -> UTF8String.fromString(fileName),
+        "file_size" -> gpkgFile.fileSize,
+        "file_block_start" -> gpkgFile.start,
+        "file_block_length" -> gpkgFile.length,
+        "file_modification_time" -> (gpkgFile.modificationTime * 1000L))
+
+      val innerStructType = 
metadataSchema.fields.head.dataType.asInstanceOf[StructType]
+      val prunedValues = innerStructType.fields.map(f => 
allMetadataValues(f.name))
+      val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq)
+      val metadataRow = InternalRow.fromSeq(Seq(metadataStruct))
+
+      new PartitionReaderWithMetadata(baseReader, dataSchema, metadataSchema, 
metadataRow)
+    } else {
+      baseReader
+    }
   }
 }
+
+private[geopackage] class PartitionReaderWithMetadata(
+    reader: PartitionReader[InternalRow],
+    baseSchema: StructType,
+    metadataSchema: StructType,
+    metadataValues: InternalRow)
+    extends PartitionReader[InternalRow] {
+
+  private val joinedRow = new JoinedRow()
+  private val unsafeProjection =
+    GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map { 
case (f, i) =>
+      BoundReference(i, f.dataType, f.nullable)
+    } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) =>
+      BoundReference(baseSchema.length + i, f.dataType, f.nullable)
+    })
+
+  override def next(): Boolean = reader.next()
+
+  override def get(): InternalRow = {
+    unsafeProjection(joinedRow(reader.get(), metadataValues))
+  }
+
+  override def close(): Unit = reader.close()
+}
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
index 768afd7917..edca3d35ff 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
@@ -36,10 +36,14 @@ case class GeoPackageScan(
     fileIndex: PartitioningAwareFileIndex,
     readDataSchema: StructType,
     readPartitionSchema: StructType,
+    metadataSchema: StructType,
     options: CaseInsensitiveStringMap,
     loadOptions: GeoPackageOptions)
     extends FileScan {
 
+  override def readSchema(): StructType =
+    StructType(readDataSchema.fields ++ readPartitionSchema.fields ++ 
metadataSchema.fields)
+
   override def partitionFilters: Seq[Expression] = {
     Seq.empty
   }
@@ -54,6 +58,11 @@ case class GeoPackageScan(
     val broadcastedConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
-    GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf, 
loadOptions, dataSchema)
+    GeoPackagePartitionReaderFactory(
+      sparkSession,
+      broadcastedConf,
+      loadOptions,
+      dataSchema,
+      metadataSchema)
   }
 }
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
index a9674395b4..d363406de1 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
@@ -36,6 +36,18 @@ class GeoPackageScanBuilder(
     userDefinedSchema: Option[StructType] = None)
     extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
 
+  private var _requiredMetadataSchema: StructType = StructType(Seq.empty)
+
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+    val resolver = sparkSession.sessionState.conf.resolver
+    val metaFields = requiredSchema.fields.filter { field =>
+      !dataSchema.fields.exists(df => resolver(df.name, field.name)) &&
+      !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name, 
field.name))
+    }
+    _requiredMetadataSchema = StructType(metaFields)
+    super.pruneColumns(requiredSchema)
+  }
+
   override def build(): Scan = {
     val paths = fileIndex.allFiles().map(_.getPath.toString)
 
@@ -54,6 +66,7 @@ class GeoPackageScanBuilder(
       fileIndexAdjusted,
       dataSchema,
       readPartitionSchema(),
+      _requiredMetadataSchema,
       options,
       loadOptions)
   }
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 999aa81280..498933de30 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -22,11 +22,12 @@ import org.apache.hadoop.fs.FileStatus
 import 
org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, 
GeoPackageConnectionManager}
 import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, 
MetadataSchema, TableType}
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
 import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, 
StructField, StructType, TimestampType}
+import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, 
LongType, StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.SerializableConfiguration
 
@@ -40,7 +41,8 @@ case class GeoPackageTable(
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat],
     loadOptions: GeoPackageOptions)
-    extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+    extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
+    with SupportsMetadataColumns {
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
     if (loadOptions.showMetadata) {
@@ -74,6 +76,8 @@ case class GeoPackageTable(
     "GeoPackage"
   }
 
+  override def metadataColumns(): Array[MetadataColumn] = 
GeoPackageTable.fileMetadataColumns
+
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
     new GeoPackageScanBuilder(
       sparkSession,
@@ -88,3 +92,21 @@ case class GeoPackageTable(
     null
   }
 }
+
+object GeoPackageTable {
+
+  private val FILE_METADATA_STRUCT_TYPE: StructType = StructType(
+    Seq(
+      StructField("file_path", StringType, nullable = false),
+      StructField("file_name", StringType, nullable = false),
+      StructField("file_size", LongType, nullable = false),
+      StructField("file_block_start", LongType, nullable = false),
+      StructField("file_block_length", LongType, nullable = false),
+      StructField("file_modification_time", TimestampType, nullable = false)))
+
+  private[geopackage] val fileMetadataColumns: Array[MetadataColumn] = 
Array(new MetadataColumn {
+    override def name: String = "_metadata"
+    override def dataType: DataType = FILE_METADATA_STRUCT_TYPE
+    override def isNullable: Boolean = false
+  })
+}
diff --git 
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
 
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
index 166d8c48db..0443553a86 100644
--- 
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
+++ 
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
@@ -19,7 +19,7 @@
 package org.apache.sedona.sql
 
 import io.minio.{MakeBucketArgs, MinioClient}
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.functions.expr
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.types._
@@ -321,6 +321,78 @@ class GeoPackageReaderTest extends TestBaseScala with 
Matchers {
     }
   }
 
+  describe("_metadata hidden column support") {
+    it("should expose _metadata struct with all expected fields") {
+      val df = readFeatureData("point1")
+      val metaDf = df.select("_metadata")
+      val metaSchema = 
metaDf.schema.fields.head.dataType.asInstanceOf[StructType]
+      val fieldNames = metaSchema.fieldNames.toSet
+      fieldNames should contain("file_path")
+      fieldNames should contain("file_name")
+      fieldNames should contain("file_size")
+      fieldNames should contain("file_block_start")
+      fieldNames should contain("file_block_length")
+      fieldNames should contain("file_modification_time")
+    }
+
+    it("should not include _metadata in select(*)") {
+      val df = readFeatureData("point1")
+      val starCols = df.select("*").columns.toSet
+      starCols should not contain "_metadata"
+    }
+
+    it("should return correct file_path and file_name in _metadata") {
+      val df = readFeatureData("point1")
+      val row = df.select("_metadata.file_path", "_metadata.file_name").head()
+      val filePath = row.getString(0)
+      val fileName = row.getString(1)
+      filePath should endWith("example.gpkg")
+      fileName shouldEqual "example.gpkg"
+    }
+
+    it("should return actual file_size matching the .gpkg file on disk") {
+      val df = readFeatureData("point1")
+      val metaFileSize = df.select("_metadata.file_size").head().getLong(0)
+      val actualFile = new java.io.File(path)
+      metaFileSize shouldEqual actualFile.length()
+    }
+
+    it("should return file_block_start=0 and file_block_length=file_size") {
+      val df = readFeatureData("point1")
+      val row = df
+        .select(
+          "_metadata.file_block_start",
+          "_metadata.file_block_length",
+          "_metadata.file_size")
+        .head()
+      row.getLong(0) shouldEqual 0L
+      row.getLong(1) shouldEqual row.getLong(2)
+    }
+
+    it("should return file_modification_time matching the .gpkg file on disk") 
{
+      val df = readFeatureData("point1")
+      val metaModTime = 
df.select("_metadata.file_modification_time").head().getTimestamp(0)
+      val actualFile = new java.io.File(path)
+      val expectedModTime = new java.sql.Timestamp(actualFile.lastModified())
+      metaModTime shouldEqual expectedModTime
+    }
+
+    it("should allow filtering on _metadata fields") {
+      val df = readFeatureData("point1")
+      val filtered = df.filter(df("_metadata.file_name") === "example.gpkg")
+      filtered.count() shouldEqual df.count()
+      val empty = df.filter(df("_metadata.file_name") === "nonexistent.gpkg")
+      empty.count() shouldEqual 0
+    }
+
+    it("should select _metadata along with data columns") {
+      val df = readFeatureData("point1")
+      val result = df.select("id", "_metadata.file_name").head()
+      result.getInt(0) shouldEqual 1
+      result.getString(1) shouldEqual "example.gpkg"
+    }
+  }
+
   private def readFeatureData(tableName: String): DataFrame = {
     sparkSession.read
       .format("geopackage")
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
index b1d38996b0..0f2e9a87b8 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
@@ -25,16 +25,20 @@ import 
org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Ge
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
 import org.apache.spark.sql.execution.datasources.FilePartition
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.SerializableConfiguration
 
 case class GeoPackagePartitionReaderFactory(
     sparkSession: SparkSession,
     broadcastedConf: Broadcast[SerializableConfiguration],
     loadOptions: GeoPackageOptions,
-    dataSchema: StructType)
+    dataSchema: StructType,
+    metadataSchema: StructType)
     extends PartitionReaderFactory {
 
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
@@ -72,7 +76,7 @@ case class GeoPackagePartitionReaderFactory(
       case _ => None
     }
 
-    GeoPackagePartitionReader(
+    val baseReader = GeoPackagePartitionReader(
       rs = rs,
       options = GeoPackageReadOptions(
         tableName = loadOptions.tableName,
@@ -84,5 +88,52 @@ case class GeoPackagePartitionReaderFactory(
       broadcastedConf = broadcastedConf,
       currentTempFile = tempFile,
       copying = copied)
+
+    if (metadataSchema.nonEmpty) {
+      val gpkgFile = partitionFiles.head
+      val filePath = gpkgFile.filePath.toString
+      val fileName = new Path(filePath).getName
+
+      val allMetadataValues: Map[String, Any] = Map(
+        "file_path" -> UTF8String.fromString(filePath),
+        "file_name" -> UTF8String.fromString(fileName),
+        "file_size" -> gpkgFile.fileSize,
+        "file_block_start" -> gpkgFile.start,
+        "file_block_length" -> gpkgFile.length,
+        "file_modification_time" -> (gpkgFile.modificationTime * 1000L))
+
+      val innerStructType = 
metadataSchema.fields.head.dataType.asInstanceOf[StructType]
+      val prunedValues = innerStructType.fields.map(f => 
allMetadataValues(f.name))
+      val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq)
+      val metadataRow = InternalRow.fromSeq(Seq(metadataStruct))
+
+      new PartitionReaderWithMetadata(baseReader, dataSchema, metadataSchema, 
metadataRow)
+    } else {
+      baseReader
+    }
   }
 }
+
+private[geopackage] class PartitionReaderWithMetadata(
+    reader: PartitionReader[InternalRow],
+    baseSchema: StructType,
+    metadataSchema: StructType,
+    metadataValues: InternalRow)
+    extends PartitionReader[InternalRow] {
+
+  private val joinedRow = new JoinedRow()
+  private val unsafeProjection =
+    GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map { 
case (f, i) =>
+      BoundReference(i, f.dataType, f.nullable)
+    } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) =>
+      BoundReference(baseSchema.length + i, f.dataType, f.nullable)
+    })
+
+  override def next(): Boolean = reader.next()
+
+  override def get(): InternalRow = {
+    unsafeProjection(joinedRow(reader.get(), metadataValues))
+  }
+
+  override def close(): Unit = reader.close()
+}
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
index 768afd7917..edca3d35ff 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
@@ -36,10 +36,14 @@ case class GeoPackageScan(
     fileIndex: PartitioningAwareFileIndex,
     readDataSchema: StructType,
     readPartitionSchema: StructType,
+    metadataSchema: StructType,
     options: CaseInsensitiveStringMap,
     loadOptions: GeoPackageOptions)
     extends FileScan {
 
+  override def readSchema(): StructType =
+    StructType(readDataSchema.fields ++ readPartitionSchema.fields ++ 
metadataSchema.fields)
+
   override def partitionFilters: Seq[Expression] = {
     Seq.empty
   }
@@ -54,6 +58,11 @@ case class GeoPackageScan(
     val broadcastedConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
-    GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf, 
loadOptions, dataSchema)
+    GeoPackagePartitionReaderFactory(
+      sparkSession,
+      broadcastedConf,
+      loadOptions,
+      dataSchema,
+      metadataSchema)
   }
 }
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
index 829bd9c220..7fdb716f29 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
@@ -36,6 +36,18 @@ class GeoPackageScanBuilder(
     userDefinedSchema: Option[StructType] = None)
     extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
 
+  private var _requiredMetadataSchema: StructType = StructType(Seq.empty)
+
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+    val resolver = sparkSession.sessionState.conf.resolver
+    val metaFields = requiredSchema.fields.filter { field =>
+      !dataSchema.fields.exists(df => resolver(df.name, field.name)) &&
+      !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name, 
field.name))
+    }
+    _requiredMetadataSchema = StructType(metaFields)
+    super.pruneColumns(requiredSchema)
+  }
+
   override def build(): Scan = {
     val fileIndexAdjusted =
       if (loadOptions.showMetadata)
@@ -52,6 +64,7 @@ class GeoPackageScanBuilder(
       fileIndexAdjusted,
       dataSchema,
       readPartitionSchema(),
+      _requiredMetadataSchema,
       options,
       loadOptions)
   }
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 85dec8427e..498933de30 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -22,11 +22,12 @@ import org.apache.hadoop.fs.FileStatus
 import 
org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, 
GeoPackageConnectionManager}
 import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, 
MetadataSchema, TableType}
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
 import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, 
StructField, StructType, TimestampType}
+import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, 
LongType, StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.SerializableConfiguration
 
@@ -40,7 +41,8 @@ case class GeoPackageTable(
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat],
     loadOptions: GeoPackageOptions)
-    extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+    extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
+    with SupportsMetadataColumns {
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
     if (loadOptions.showMetadata) {
@@ -74,6 +76,8 @@ case class GeoPackageTable(
     "GeoPackage"
   }
 
+  override def metadataColumns(): Array[MetadataColumn] = 
GeoPackageTable.fileMetadataColumns
+
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
     new GeoPackageScanBuilder(
       sparkSession,
@@ -87,5 +91,22 @@ case class GeoPackageTable(
   override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
     null
   }
+}
+
+object GeoPackageTable {
+
+  private val FILE_METADATA_STRUCT_TYPE: StructType = StructType(
+    Seq(
+      StructField("file_path", StringType, nullable = false),
+      StructField("file_name", StringType, nullable = false),
+      StructField("file_size", LongType, nullable = false),
+      StructField("file_block_start", LongType, nullable = false),
+      StructField("file_block_length", LongType, nullable = false),
+      StructField("file_modification_time", TimestampType, nullable = false)))
 
+  private[geopackage] val fileMetadataColumns: Array[MetadataColumn] = 
Array(new MetadataColumn {
+    override def name: String = "_metadata"
+    override def dataType: DataType = FILE_METADATA_STRUCT_TYPE
+    override def isNullable: Boolean = false
+  })
 }
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
index 66fa147bc5..f37298661f 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
@@ -19,7 +19,7 @@
 package org.apache.sedona.sql
 
 import io.minio.{MakeBucketArgs, MinioClient, PutObjectArgs}
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.sql.functions.expr
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.types.{BinaryType, BooleanType, DateType, 
DoubleType, IntegerType, StringType, StructField, StructType, TimestampType}
@@ -321,6 +321,78 @@ class GeoPackageReaderTest extends TestBaseScala with 
Matchers {
     }
   }
 
+  describe("_metadata hidden column support") {
+    it("should expose _metadata struct with all expected fields") {
+      val df = readFeatureData("point1")
+      val metaDf = df.select("_metadata")
+      val metaSchema = 
metaDf.schema.fields.head.dataType.asInstanceOf[StructType]
+      val fieldNames = metaSchema.fieldNames.toSet
+      fieldNames should contain("file_path")
+      fieldNames should contain("file_name")
+      fieldNames should contain("file_size")
+      fieldNames should contain("file_block_start")
+      fieldNames should contain("file_block_length")
+      fieldNames should contain("file_modification_time")
+    }
+
+    it("should not include _metadata in select(*)") {
+      val df = readFeatureData("point1")
+      val starCols = df.select("*").columns.toSet
+      starCols should not contain "_metadata"
+    }
+
+    it("should return correct file_path and file_name in _metadata") {
+      val df = readFeatureData("point1")
+      val row = df.select("_metadata.file_path", "_metadata.file_name").head()
+      val filePath = row.getString(0)
+      val fileName = row.getString(1)
+      filePath should endWith("example.gpkg")
+      fileName shouldEqual "example.gpkg"
+    }
+
+    it("should return actual file_size matching the .gpkg file on disk") {
+      val df = readFeatureData("point1")
+      val metaFileSize = df.select("_metadata.file_size").head().getLong(0)
+      val actualFile = new java.io.File(path)
+      metaFileSize shouldEqual actualFile.length()
+    }
+
+    it("should return file_block_start=0 and file_block_length=file_size") {
+      val df = readFeatureData("point1")
+      val row = df
+        .select(
+          "_metadata.file_block_start",
+          "_metadata.file_block_length",
+          "_metadata.file_size")
+        .head()
+      row.getLong(0) shouldEqual 0L
+      row.getLong(1) shouldEqual row.getLong(2)
+    }
+
+    it("should return file_modification_time matching the .gpkg file on disk") 
{
+      val df = readFeatureData("point1")
+      val metaModTime = 
df.select("_metadata.file_modification_time").head().getTimestamp(0)
+      val actualFile = new java.io.File(path)
+      val expectedModTime = new java.sql.Timestamp(actualFile.lastModified())
+      metaModTime shouldEqual expectedModTime
+    }
+
+    it("should allow filtering on _metadata fields") {
+      val df = readFeatureData("point1")
+      val filtered = df.filter(df("_metadata.file_name") === "example.gpkg")
+      filtered.count() shouldEqual df.count()
+      val empty = df.filter(df("_metadata.file_name") === "nonexistent.gpkg")
+      empty.count() shouldEqual 0
+    }
+
+    it("should select _metadata along with data columns") {
+      val df = readFeatureData("point1")
+      val result = df.select("id", "_metadata.file_name").head()
+      result.getInt(0) shouldEqual 1
+      result.getString(1) shouldEqual "example.gpkg"
+    }
+  }
+
   private def readFeatureData(tableName: String): DataFrame = {
     sparkSession.read
       .format("geopackage")
diff --git 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
index b1d38996b0..0f2e9a87b8 100644
--- 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
+++ 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
@@ -25,16 +25,20 @@ import 
org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Ge
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
 import org.apache.spark.sql.execution.datasources.FilePartition
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.SerializableConfiguration
 
 case class GeoPackagePartitionReaderFactory(
     sparkSession: SparkSession,
     broadcastedConf: Broadcast[SerializableConfiguration],
     loadOptions: GeoPackageOptions,
-    dataSchema: StructType)
+    dataSchema: StructType,
+    metadataSchema: StructType)
     extends PartitionReaderFactory {
 
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
@@ -72,7 +76,7 @@ case class GeoPackagePartitionReaderFactory(
       case _ => None
     }
 
-    GeoPackagePartitionReader(
+    val baseReader = GeoPackagePartitionReader(
       rs = rs,
       options = GeoPackageReadOptions(
         tableName = loadOptions.tableName,
@@ -84,5 +88,52 @@ case class GeoPackagePartitionReaderFactory(
       broadcastedConf = broadcastedConf,
       currentTempFile = tempFile,
       copying = copied)
+
+    if (metadataSchema.nonEmpty) {
+      val gpkgFile = partitionFiles.head
+      val filePath = gpkgFile.filePath.toString
+      val fileName = new Path(filePath).getName
+
+      val allMetadataValues: Map[String, Any] = Map(
+        "file_path" -> UTF8String.fromString(filePath),
+        "file_name" -> UTF8String.fromString(fileName),
+        "file_size" -> gpkgFile.fileSize,
+        "file_block_start" -> gpkgFile.start,
+        "file_block_length" -> gpkgFile.length,
+        "file_modification_time" -> (gpkgFile.modificationTime * 1000L))
+
+      val innerStructType = 
metadataSchema.fields.head.dataType.asInstanceOf[StructType]
+      val prunedValues = innerStructType.fields.map(f => 
allMetadataValues(f.name))
+      val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq)
+      val metadataRow = InternalRow.fromSeq(Seq(metadataStruct))
+
+      new PartitionReaderWithMetadata(baseReader, dataSchema, metadataSchema, 
metadataRow)
+    } else {
+      baseReader
+    }
   }
 }
+
+private[geopackage] class PartitionReaderWithMetadata(
+    reader: PartitionReader[InternalRow],
+    baseSchema: StructType,
+    metadataSchema: StructType,
+    metadataValues: InternalRow)
+    extends PartitionReader[InternalRow] {
+
+  private val joinedRow = new JoinedRow()
+  private val unsafeProjection =
+    GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map { 
case (f, i) =>
+      BoundReference(i, f.dataType, f.nullable)
+    } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) =>
+      BoundReference(baseSchema.length + i, f.dataType, f.nullable)
+    })
+
+  override def next(): Boolean = reader.next()
+
+  override def get(): InternalRow = {
+    unsafeProjection(joinedRow(reader.get(), metadataValues))
+  }
+
+  override def close(): Unit = reader.close()
+}
diff --git 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
index 768afd7917..edca3d35ff 100644
--- 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
+++ 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
@@ -36,10 +36,14 @@ case class GeoPackageScan(
     fileIndex: PartitioningAwareFileIndex,
     readDataSchema: StructType,
     readPartitionSchema: StructType,
+    metadataSchema: StructType,
     options: CaseInsensitiveStringMap,
     loadOptions: GeoPackageOptions)
     extends FileScan {
 
+  override def readSchema(): StructType =
+    StructType(readDataSchema.fields ++ readPartitionSchema.fields ++ 
metadataSchema.fields)
+
   override def partitionFilters: Seq[Expression] = {
     Seq.empty
   }
@@ -54,6 +58,11 @@ case class GeoPackageScan(
     val broadcastedConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
-    GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf, 
loadOptions, dataSchema)
+    GeoPackagePartitionReaderFactory(
+      sparkSession,
+      broadcastedConf,
+      loadOptions,
+      dataSchema,
+      metadataSchema)
   }
 }
diff --git 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
index 829bd9c220..7fdb716f29 100644
--- 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
+++ 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
@@ -36,6 +36,18 @@ class GeoPackageScanBuilder(
     userDefinedSchema: Option[StructType] = None)
     extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
 
+  private var _requiredMetadataSchema: StructType = StructType(Seq.empty)
+
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+    val resolver = sparkSession.sessionState.conf.resolver
+    val metaFields = requiredSchema.fields.filter { field =>
+      !dataSchema.fields.exists(df => resolver(df.name, field.name)) &&
+      !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name, 
field.name))
+    }
+    _requiredMetadataSchema = StructType(metaFields)
+    super.pruneColumns(requiredSchema)
+  }
+
   override def build(): Scan = {
     val fileIndexAdjusted =
       if (loadOptions.showMetadata)
@@ -52,6 +64,7 @@ class GeoPackageScanBuilder(
       fileIndexAdjusted,
       dataSchema,
       readPartitionSchema(),
+      _requiredMetadataSchema,
       options,
       loadOptions)
   }
diff --git 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 85dec8427e..498933de30 100644
--- 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++ 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -22,11 +22,12 @@ import org.apache.hadoop.fs.FileStatus
 import 
org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, 
GeoPackageConnectionManager}
 import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, 
MetadataSchema, TableType}
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
 import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, 
StructField, StructType, TimestampType}
+import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, 
LongType, StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.SerializableConfiguration
 
@@ -40,7 +41,8 @@ case class GeoPackageTable(
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat],
     loadOptions: GeoPackageOptions)
-    extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+    extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
+    with SupportsMetadataColumns {
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
     if (loadOptions.showMetadata) {
@@ -74,6 +76,8 @@ case class GeoPackageTable(
     "GeoPackage"
   }
 
+  override def metadataColumns(): Array[MetadataColumn] = 
GeoPackageTable.fileMetadataColumns
+
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
     new GeoPackageScanBuilder(
       sparkSession,
@@ -87,5 +91,22 @@ case class GeoPackageTable(
   override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
     null
   }
+}
+
+object GeoPackageTable {
+
+  private val FILE_METADATA_STRUCT_TYPE: StructType = StructType(
+    Seq(
+      StructField("file_path", StringType, nullable = false),
+      StructField("file_name", StringType, nullable = false),
+      StructField("file_size", LongType, nullable = false),
+      StructField("file_block_start", LongType, nullable = false),
+      StructField("file_block_length", LongType, nullable = false),
+      StructField("file_modification_time", TimestampType, nullable = false)))
 
+  private[geopackage] val fileMetadataColumns: Array[MetadataColumn] = 
Array(new MetadataColumn {
+    override def name: String = "_metadata"
+    override def dataType: DataType = FILE_METADATA_STRUCT_TYPE
+    override def isNullable: Boolean = false
+  })
 }
diff --git 
a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
 
b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
index 66fa147bc5..f37298661f 100644
--- 
a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
+++ 
b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
@@ -19,7 +19,7 @@
 package org.apache.sedona.sql
 
 import io.minio.{MakeBucketArgs, MinioClient, PutObjectArgs}
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.sql.functions.expr
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.types.{BinaryType, BooleanType, DateType, 
DoubleType, IntegerType, StringType, StructField, StructType, TimestampType}
@@ -321,6 +321,78 @@ class GeoPackageReaderTest extends TestBaseScala with 
Matchers {
     }
   }
 
+  describe("_metadata hidden column support") {
+    it("should expose _metadata struct with all expected fields") {
+      val df = readFeatureData("point1")
+      val metaDf = df.select("_metadata")
+      val metaSchema = 
metaDf.schema.fields.head.dataType.asInstanceOf[StructType]
+      val fieldNames = metaSchema.fieldNames.toSet
+      fieldNames should contain("file_path")
+      fieldNames should contain("file_name")
+      fieldNames should contain("file_size")
+      fieldNames should contain("file_block_start")
+      fieldNames should contain("file_block_length")
+      fieldNames should contain("file_modification_time")
+    }
+
+    it("should not include _metadata in select(*)") {
+      val df = readFeatureData("point1")
+      val starCols = df.select("*").columns.toSet
+      starCols should not contain "_metadata"
+    }
+
+    it("should return correct file_path and file_name in _metadata") {
+      val df = readFeatureData("point1")
+      val row = df.select("_metadata.file_path", "_metadata.file_name").head()
+      val filePath = row.getString(0)
+      val fileName = row.getString(1)
+      filePath should endWith("example.gpkg")
+      fileName shouldEqual "example.gpkg"
+    }
+
+    it("should return actual file_size matching the .gpkg file on disk") {
+      val df = readFeatureData("point1")
+      val metaFileSize = df.select("_metadata.file_size").head().getLong(0)
+      val actualFile = new java.io.File(path)
+      metaFileSize shouldEqual actualFile.length()
+    }
+
+    it("should return file_block_start=0 and file_block_length=file_size") {
+      val df = readFeatureData("point1")
+      val row = df
+        .select(
+          "_metadata.file_block_start",
+          "_metadata.file_block_length",
+          "_metadata.file_size")
+        .head()
+      row.getLong(0) shouldEqual 0L
+      row.getLong(1) shouldEqual row.getLong(2)
+    }
+
+    it("should return file_modification_time matching the .gpkg file on disk") 
{
+      val df = readFeatureData("point1")
+      val metaModTime = 
df.select("_metadata.file_modification_time").head().getTimestamp(0)
+      val actualFile = new java.io.File(path)
+      val expectedModTime = new java.sql.Timestamp(actualFile.lastModified())
+      metaModTime shouldEqual expectedModTime
+    }
+
+    it("should allow filtering on _metadata fields") {
+      val df = readFeatureData("point1")
+      val filtered = df.filter(df("_metadata.file_name") === "example.gpkg")
+      filtered.count() shouldEqual df.count()
+      val empty = df.filter(df("_metadata.file_name") === "nonexistent.gpkg")
+      empty.count() shouldEqual 0
+    }
+
+    it("should select _metadata along with data columns") {
+      val df = readFeatureData("point1")
+      val result = df.select("id", "_metadata.file_name").head()
+      result.getInt(0) shouldEqual 1
+      result.getString(1) shouldEqual "example.gpkg"
+    }
+  }
+
   private def readFeatureData(tableName: String): DataFrame = {
     sparkSession.read
       .format("geopackage")
diff --git 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
index b1d38996b0..0f2e9a87b8 100644
--- 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
+++ 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
@@ -25,16 +25,20 @@ import 
org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Ge
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
 import org.apache.spark.sql.execution.datasources.FilePartition
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.SerializableConfiguration
 
 case class GeoPackagePartitionReaderFactory(
     sparkSession: SparkSession,
     broadcastedConf: Broadcast[SerializableConfiguration],
     loadOptions: GeoPackageOptions,
-    dataSchema: StructType)
+    dataSchema: StructType,
+    metadataSchema: StructType)
     extends PartitionReaderFactory {
 
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
@@ -72,7 +76,7 @@ case class GeoPackagePartitionReaderFactory(
       case _ => None
     }
 
-    GeoPackagePartitionReader(
+    val baseReader = GeoPackagePartitionReader(
       rs = rs,
       options = GeoPackageReadOptions(
         tableName = loadOptions.tableName,
@@ -84,5 +88,52 @@ case class GeoPackagePartitionReaderFactory(
       broadcastedConf = broadcastedConf,
       currentTempFile = tempFile,
       copying = copied)
+
+    if (metadataSchema.nonEmpty) {
+      val gpkgFile = partitionFiles.head
+      val filePath = gpkgFile.filePath.toString
+      val fileName = new Path(filePath).getName
+
+      val allMetadataValues: Map[String, Any] = Map(
+        "file_path" -> UTF8String.fromString(filePath),
+        "file_name" -> UTF8String.fromString(fileName),
+        "file_size" -> gpkgFile.fileSize,
+        "file_block_start" -> gpkgFile.start,
+        "file_block_length" -> gpkgFile.length,
+        "file_modification_time" -> (gpkgFile.modificationTime * 1000L))
+
+      val innerStructType = 
metadataSchema.fields.head.dataType.asInstanceOf[StructType]
+      val prunedValues = innerStructType.fields.map(f => 
allMetadataValues(f.name))
+      val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq)
+      val metadataRow = InternalRow.fromSeq(Seq(metadataStruct))
+
+      new PartitionReaderWithMetadata(baseReader, dataSchema, metadataSchema, 
metadataRow)
+    } else {
+      baseReader
+    }
   }
 }
+
+private[geopackage] class PartitionReaderWithMetadata(
+    reader: PartitionReader[InternalRow],
+    baseSchema: StructType,
+    metadataSchema: StructType,
+    metadataValues: InternalRow)
+    extends PartitionReader[InternalRow] {
+
+  private val joinedRow = new JoinedRow()
+  private val unsafeProjection =
+    GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map { 
case (f, i) =>
+      BoundReference(i, f.dataType, f.nullable)
+    } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) =>
+      BoundReference(baseSchema.length + i, f.dataType, f.nullable)
+    })
+
+  override def next(): Boolean = reader.next()
+
+  override def get(): InternalRow = {
+    unsafeProjection(joinedRow(reader.get(), metadataValues))
+  }
+
+  override def close(): Unit = reader.close()
+}
diff --git 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
index 768afd7917..edca3d35ff 100644
--- 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
+++ 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
@@ -36,10 +36,14 @@ case class GeoPackageScan(
     fileIndex: PartitioningAwareFileIndex,
     readDataSchema: StructType,
     readPartitionSchema: StructType,
+    metadataSchema: StructType,
     options: CaseInsensitiveStringMap,
     loadOptions: GeoPackageOptions)
     extends FileScan {
 
+  override def readSchema(): StructType =
+    StructType(readDataSchema.fields ++ readPartitionSchema.fields ++ 
metadataSchema.fields)
+
   override def partitionFilters: Seq[Expression] = {
     Seq.empty
   }
@@ -54,6 +58,11 @@ case class GeoPackageScan(
     val broadcastedConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
-    GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf, 
loadOptions, dataSchema)
+    GeoPackagePartitionReaderFactory(
+      sparkSession,
+      broadcastedConf,
+      loadOptions,
+      dataSchema,
+      metadataSchema)
   }
 }
diff --git 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
index 829bd9c220..7fdb716f29 100644
--- 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
+++ 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
@@ -36,6 +36,18 @@ class GeoPackageScanBuilder(
     userDefinedSchema: Option[StructType] = None)
     extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
 
+  private var _requiredMetadataSchema: StructType = StructType(Seq.empty)
+
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+    val resolver = sparkSession.sessionState.conf.resolver
+    val metaFields = requiredSchema.fields.filter { field =>
+      !dataSchema.fields.exists(df => resolver(df.name, field.name)) &&
+      !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name, 
field.name))
+    }
+    _requiredMetadataSchema = StructType(metaFields)
+    super.pruneColumns(requiredSchema)
+  }
+
   override def build(): Scan = {
     val fileIndexAdjusted =
       if (loadOptions.showMetadata)
@@ -52,6 +64,7 @@ class GeoPackageScanBuilder(
       fileIndexAdjusted,
       dataSchema,
       readPartitionSchema(),
+      _requiredMetadataSchema,
       options,
       loadOptions)
   }
diff --git 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 85dec8427e..498933de30 100644
--- 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++ 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -22,11 +22,12 @@ import org.apache.hadoop.fs.FileStatus
 import 
org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, 
GeoPackageConnectionManager}
 import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, 
MetadataSchema, TableType}
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
 import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, 
StructField, StructType, TimestampType}
+import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, 
LongType, StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.SerializableConfiguration
 
@@ -40,7 +41,8 @@ case class GeoPackageTable(
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat],
     loadOptions: GeoPackageOptions)
-    extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+    extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
+    with SupportsMetadataColumns {
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
     if (loadOptions.showMetadata) {
@@ -74,6 +76,8 @@ case class GeoPackageTable(
     "GeoPackage"
   }
 
+  override def metadataColumns(): Array[MetadataColumn] = 
GeoPackageTable.fileMetadataColumns
+
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
     new GeoPackageScanBuilder(
       sparkSession,
@@ -87,5 +91,22 @@ case class GeoPackageTable(
   override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
     null
   }
+}
+
+object GeoPackageTable {
+
+  private val FILE_METADATA_STRUCT_TYPE: StructType = StructType(
+    Seq(
+      StructField("file_path", StringType, nullable = false),
+      StructField("file_name", StringType, nullable = false),
+      StructField("file_size", LongType, nullable = false),
+      StructField("file_block_start", LongType, nullable = false),
+      StructField("file_block_length", LongType, nullable = false),
+      StructField("file_modification_time", TimestampType, nullable = false)))
 
+  private[geopackage] val fileMetadataColumns: Array[MetadataColumn] = 
Array(new MetadataColumn {
+    override def name: String = "_metadata"
+    override def dataType: DataType = FILE_METADATA_STRUCT_TYPE
+    override def isNullable: Boolean = false
+  })
 }
diff --git 
a/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
 
b/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
index 66fa147bc5..f37298661f 100644
--- 
a/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
+++ 
b/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
@@ -19,7 +19,7 @@
 package org.apache.sedona.sql
 
 import io.minio.{MakeBucketArgs, MinioClient, PutObjectArgs}
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.sql.functions.expr
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.types.{BinaryType, BooleanType, DateType, 
DoubleType, IntegerType, StringType, StructField, StructType, TimestampType}
@@ -321,6 +321,78 @@ class GeoPackageReaderTest extends TestBaseScala with 
Matchers {
     }
   }
 
+  describe("_metadata hidden column support") {
+    it("should expose _metadata struct with all expected fields") {
+      val df = readFeatureData("point1")
+      val metaDf = df.select("_metadata")
+      val metaSchema = 
metaDf.schema.fields.head.dataType.asInstanceOf[StructType]
+      val fieldNames = metaSchema.fieldNames.toSet
+      fieldNames should contain("file_path")
+      fieldNames should contain("file_name")
+      fieldNames should contain("file_size")
+      fieldNames should contain("file_block_start")
+      fieldNames should contain("file_block_length")
+      fieldNames should contain("file_modification_time")
+    }
+
+    it("should not include _metadata in select(*)") {
+      val df = readFeatureData("point1")
+      val starCols = df.select("*").columns.toSet
+      starCols should not contain "_metadata"
+    }
+
+    it("should return correct file_path and file_name in _metadata") {
+      val df = readFeatureData("point1")
+      val row = df.select("_metadata.file_path", "_metadata.file_name").head()
+      val filePath = row.getString(0)
+      val fileName = row.getString(1)
+      filePath should endWith("example.gpkg")
+      fileName shouldEqual "example.gpkg"
+    }
+
+    it("should return actual file_size matching the .gpkg file on disk") {
+      val df = readFeatureData("point1")
+      val metaFileSize = df.select("_metadata.file_size").head().getLong(0)
+      val actualFile = new java.io.File(path)
+      metaFileSize shouldEqual actualFile.length()
+    }
+
+    it("should return file_block_start=0 and file_block_length=file_size") {
+      val df = readFeatureData("point1")
+      val row = df
+        .select(
+          "_metadata.file_block_start",
+          "_metadata.file_block_length",
+          "_metadata.file_size")
+        .head()
+      row.getLong(0) shouldEqual 0L
+      row.getLong(1) shouldEqual row.getLong(2)
+    }
+
+    it("should return file_modification_time matching the .gpkg file on disk") 
{
+      val df = readFeatureData("point1")
+      val metaModTime = 
df.select("_metadata.file_modification_time").head().getTimestamp(0)
+      val actualFile = new java.io.File(path)
+      val expectedModTime = new java.sql.Timestamp(actualFile.lastModified())
+      metaModTime shouldEqual expectedModTime
+    }
+
+    it("should allow filtering on _metadata fields") {
+      val df = readFeatureData("point1")
+      val filtered = df.filter(df("_metadata.file_name") === "example.gpkg")
+      filtered.count() shouldEqual df.count()
+      val empty = df.filter(df("_metadata.file_name") === "nonexistent.gpkg")
+      empty.count() shouldEqual 0
+    }
+
+    it("should select _metadata along with data columns") {
+      val df = readFeatureData("point1")
+      val result = df.select("id", "_metadata.file_name").head()
+      result.getInt(0) shouldEqual 1
+      result.getString(1) shouldEqual "example.gpkg"
+    }
+  }
+
   private def readFeatureData(tableName: String): DataFrame = {
     sparkSession.read
       .format("geopackage")

Reply via email to