This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new e6e99f016b [GH-2651] Add _metadata hidden column support for
GeoPackage DataSource V2 reader (#2654)
e6e99f016b is described below
commit e6e99f016b4bbf2c435a154ac53e9dad31f379b3
Author: Jia Yu <[email protected]>
AuthorDate: Sun Feb 15 00:57:55 2026 -0700
[GH-2651] Add _metadata hidden column support for GeoPackage DataSource V2
reader (#2654)
---
.../GeoPackagePartitionReaderFactory.scala | 55 +++++++++++++++-
.../datasources/geopackage/GeoPackageScan.scala | 11 +++-
.../geopackage/GeoPackageScanBuilder.scala | 13 ++++
.../datasources/geopackage/GeoPackageTable.scala | 26 +++++++-
.../apache/sedona/sql/GeoPackageReaderTest.scala | 74 +++++++++++++++++++++-
.../GeoPackagePartitionReaderFactory.scala | 55 +++++++++++++++-
.../datasources/geopackage/GeoPackageScan.scala | 11 +++-
.../geopackage/GeoPackageScanBuilder.scala | 13 ++++
.../datasources/geopackage/GeoPackageTable.scala | 25 +++++++-
.../apache/sedona/sql/GeoPackageReaderTest.scala | 74 +++++++++++++++++++++-
.../GeoPackagePartitionReaderFactory.scala | 55 +++++++++++++++-
.../datasources/geopackage/GeoPackageScan.scala | 11 +++-
.../geopackage/GeoPackageScanBuilder.scala | 13 ++++
.../datasources/geopackage/GeoPackageTable.scala | 25 +++++++-
.../apache/sedona/sql/GeoPackageReaderTest.scala | 74 +++++++++++++++++++++-
.../GeoPackagePartitionReaderFactory.scala | 55 +++++++++++++++-
.../datasources/geopackage/GeoPackageScan.scala | 11 +++-
.../geopackage/GeoPackageScanBuilder.scala | 13 ++++
.../datasources/geopackage/GeoPackageTable.scala | 25 +++++++-
.../apache/sedona/sql/GeoPackageReaderTest.scala | 74 +++++++++++++++++++++-
20 files changed, 689 insertions(+), 24 deletions(-)
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
index b1d38996b0..0f2e9a87b8 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
@@ -25,16 +25,20 @@ import
org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Ge
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow}
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory}
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration
case class GeoPackagePartitionReaderFactory(
sparkSession: SparkSession,
broadcastedConf: Broadcast[SerializableConfiguration],
loadOptions: GeoPackageOptions,
- dataSchema: StructType)
+ dataSchema: StructType,
+ metadataSchema: StructType)
extends PartitionReaderFactory {
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
@@ -72,7 +76,7 @@ case class GeoPackagePartitionReaderFactory(
case _ => None
}
- GeoPackagePartitionReader(
+ val baseReader = GeoPackagePartitionReader(
rs = rs,
options = GeoPackageReadOptions(
tableName = loadOptions.tableName,
@@ -84,5 +88,52 @@ case class GeoPackagePartitionReaderFactory(
broadcastedConf = broadcastedConf,
currentTempFile = tempFile,
copying = copied)
+
+ if (metadataSchema.nonEmpty) {
+ val gpkgFile = partitionFiles.head
+ val filePath = gpkgFile.filePath.toString
+ val fileName = new Path(filePath).getName
+
+ val allMetadataValues: Map[String, Any] = Map(
+ "file_path" -> UTF8String.fromString(filePath),
+ "file_name" -> UTF8String.fromString(fileName),
+ "file_size" -> gpkgFile.fileSize,
+ "file_block_start" -> gpkgFile.start,
+ "file_block_length" -> gpkgFile.length,
+ "file_modification_time" -> (gpkgFile.modificationTime * 1000L))
+
+ val innerStructType =
metadataSchema.fields.head.dataType.asInstanceOf[StructType]
+ val prunedValues = innerStructType.fields.map(f =>
allMetadataValues(f.name))
+ val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq)
+ val metadataRow = InternalRow.fromSeq(Seq(metadataStruct))
+
+ new PartitionReaderWithMetadata(baseReader, dataSchema, metadataSchema,
metadataRow)
+ } else {
+ baseReader
+ }
}
}
+
+private[geopackage] class PartitionReaderWithMetadata(
+ reader: PartitionReader[InternalRow],
+ baseSchema: StructType,
+ metadataSchema: StructType,
+ metadataValues: InternalRow)
+ extends PartitionReader[InternalRow] {
+
+ private val joinedRow = new JoinedRow()
+ private val unsafeProjection =
+ GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map {
case (f, i) =>
+ BoundReference(i, f.dataType, f.nullable)
+ } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) =>
+ BoundReference(baseSchema.length + i, f.dataType, f.nullable)
+ })
+
+ override def next(): Boolean = reader.next()
+
+ override def get(): InternalRow = {
+ unsafeProjection(joinedRow(reader.get(), metadataValues))
+ }
+
+ override def close(): Unit = reader.close()
+}
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
index 768afd7917..edca3d35ff 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
@@ -36,10 +36,14 @@ case class GeoPackageScan(
fileIndex: PartitioningAwareFileIndex,
readDataSchema: StructType,
readPartitionSchema: StructType,
+ metadataSchema: StructType,
options: CaseInsensitiveStringMap,
loadOptions: GeoPackageOptions)
extends FileScan {
+ override def readSchema(): StructType =
+ StructType(readDataSchema.fields ++ readPartitionSchema.fields ++
metadataSchema.fields)
+
override def partitionFilters: Seq[Expression] = {
Seq.empty
}
@@ -54,6 +58,11 @@ case class GeoPackageScan(
val broadcastedConf =
sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
- GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf,
loadOptions, dataSchema)
+ GeoPackagePartitionReaderFactory(
+ sparkSession,
+ broadcastedConf,
+ loadOptions,
+ dataSchema,
+ metadataSchema)
}
}
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
index a9674395b4..d363406de1 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
@@ -36,6 +36,18 @@ class GeoPackageScanBuilder(
userDefinedSchema: Option[StructType] = None)
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+ private var _requiredMetadataSchema: StructType = StructType(Seq.empty)
+
+ override def pruneColumns(requiredSchema: StructType): Unit = {
+ val resolver = sparkSession.sessionState.conf.resolver
+ val metaFields = requiredSchema.fields.filter { field =>
+ !dataSchema.fields.exists(df => resolver(df.name, field.name)) &&
+ !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name,
field.name))
+ }
+ _requiredMetadataSchema = StructType(metaFields)
+ super.pruneColumns(requiredSchema)
+ }
+
override def build(): Scan = {
val paths = fileIndex.allFiles().map(_.getPath.toString)
@@ -54,6 +66,7 @@ class GeoPackageScanBuilder(
fileIndexAdjusted,
dataSchema,
readPartitionSchema(),
+ _requiredMetadataSchema,
options,
loadOptions)
}
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 999aa81280..498933de30 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -22,11 +22,12 @@ import org.apache.hadoop.fs.FileStatus
import
org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils,
GeoPackageConnectionManager}
import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions,
MetadataSchema, TableType}
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType,
StructField, StructType, TimestampType}
+import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType,
LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration
@@ -40,7 +41,8 @@ case class GeoPackageTable(
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat],
loadOptions: GeoPackageOptions)
- extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+ extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
+ with SupportsMetadataColumns {
override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
if (loadOptions.showMetadata) {
@@ -74,6 +76,8 @@ case class GeoPackageTable(
"GeoPackage"
}
+ override def metadataColumns(): Array[MetadataColumn] =
GeoPackageTable.fileMetadataColumns
+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
new GeoPackageScanBuilder(
sparkSession,
@@ -88,3 +92,21 @@ case class GeoPackageTable(
null
}
}
+
+object GeoPackageTable {
+
+ private val FILE_METADATA_STRUCT_TYPE: StructType = StructType(
+ Seq(
+ StructField("file_path", StringType, nullable = false),
+ StructField("file_name", StringType, nullable = false),
+ StructField("file_size", LongType, nullable = false),
+ StructField("file_block_start", LongType, nullable = false),
+ StructField("file_block_length", LongType, nullable = false),
+ StructField("file_modification_time", TimestampType, nullable = false)))
+
+ private[geopackage] val fileMetadataColumns: Array[MetadataColumn] =
Array(new MetadataColumn {
+ override def name: String = "_metadata"
+ override def dataType: DataType = FILE_METADATA_STRUCT_TYPE
+ override def isNullable: Boolean = false
+ })
+}
diff --git
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
index 166d8c48db..0443553a86 100644
---
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
+++
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
@@ -19,7 +19,7 @@
package org.apache.sedona.sql
import io.minio.{MakeBucketArgs, MinioClient}
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.types._
@@ -321,6 +321,78 @@ class GeoPackageReaderTest extends TestBaseScala with
Matchers {
}
}
+ describe("_metadata hidden column support") {
+ it("should expose _metadata struct with all expected fields") {
+ val df = readFeatureData("point1")
+ val metaDf = df.select("_metadata")
+ val metaSchema =
metaDf.schema.fields.head.dataType.asInstanceOf[StructType]
+ val fieldNames = metaSchema.fieldNames.toSet
+ fieldNames should contain("file_path")
+ fieldNames should contain("file_name")
+ fieldNames should contain("file_size")
+ fieldNames should contain("file_block_start")
+ fieldNames should contain("file_block_length")
+ fieldNames should contain("file_modification_time")
+ }
+
+ it("should not include _metadata in select(*)") {
+ val df = readFeatureData("point1")
+ val starCols = df.select("*").columns.toSet
+ starCols should not contain "_metadata"
+ }
+
+ it("should return correct file_path and file_name in _metadata") {
+ val df = readFeatureData("point1")
+ val row = df.select("_metadata.file_path", "_metadata.file_name").head()
+ val filePath = row.getString(0)
+ val fileName = row.getString(1)
+ filePath should endWith("example.gpkg")
+ fileName shouldEqual "example.gpkg"
+ }
+
+ it("should return actual file_size matching the .gpkg file on disk") {
+ val df = readFeatureData("point1")
+ val metaFileSize = df.select("_metadata.file_size").head().getLong(0)
+ val actualFile = new java.io.File(path)
+ metaFileSize shouldEqual actualFile.length()
+ }
+
+ it("should return file_block_start=0 and file_block_length=file_size") {
+ val df = readFeatureData("point1")
+ val row = df
+ .select(
+ "_metadata.file_block_start",
+ "_metadata.file_block_length",
+ "_metadata.file_size")
+ .head()
+ row.getLong(0) shouldEqual 0L
+ row.getLong(1) shouldEqual row.getLong(2)
+ }
+
+ it("should return file_modification_time matching the .gpkg file on disk")
{
+ val df = readFeatureData("point1")
+ val metaModTime =
df.select("_metadata.file_modification_time").head().getTimestamp(0)
+ val actualFile = new java.io.File(path)
+ val expectedModTime = new java.sql.Timestamp(actualFile.lastModified())
+ metaModTime shouldEqual expectedModTime
+ }
+
+ it("should allow filtering on _metadata fields") {
+ val df = readFeatureData("point1")
+ val filtered = df.filter(df("_metadata.file_name") === "example.gpkg")
+ filtered.count() shouldEqual df.count()
+ val empty = df.filter(df("_metadata.file_name") === "nonexistent.gpkg")
+ empty.count() shouldEqual 0
+ }
+
+ it("should select _metadata along with data columns") {
+ val df = readFeatureData("point1")
+ val result = df.select("id", "_metadata.file_name").head()
+ result.getInt(0) shouldEqual 1
+ result.getString(1) shouldEqual "example.gpkg"
+ }
+ }
+
private def readFeatureData(tableName: String): DataFrame = {
sparkSession.read
.format("geopackage")
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
index b1d38996b0..0f2e9a87b8 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
@@ -25,16 +25,20 @@ import
org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Ge
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow}
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory}
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration
case class GeoPackagePartitionReaderFactory(
sparkSession: SparkSession,
broadcastedConf: Broadcast[SerializableConfiguration],
loadOptions: GeoPackageOptions,
- dataSchema: StructType)
+ dataSchema: StructType,
+ metadataSchema: StructType)
extends PartitionReaderFactory {
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
@@ -72,7 +76,7 @@ case class GeoPackagePartitionReaderFactory(
case _ => None
}
- GeoPackagePartitionReader(
+ val baseReader = GeoPackagePartitionReader(
rs = rs,
options = GeoPackageReadOptions(
tableName = loadOptions.tableName,
@@ -84,5 +88,52 @@ case class GeoPackagePartitionReaderFactory(
broadcastedConf = broadcastedConf,
currentTempFile = tempFile,
copying = copied)
+
+ if (metadataSchema.nonEmpty) {
+ val gpkgFile = partitionFiles.head
+ val filePath = gpkgFile.filePath.toString
+ val fileName = new Path(filePath).getName
+
+ val allMetadataValues: Map[String, Any] = Map(
+ "file_path" -> UTF8String.fromString(filePath),
+ "file_name" -> UTF8String.fromString(fileName),
+ "file_size" -> gpkgFile.fileSize,
+ "file_block_start" -> gpkgFile.start,
+ "file_block_length" -> gpkgFile.length,
+ "file_modification_time" -> (gpkgFile.modificationTime * 1000L))
+
+ val innerStructType =
metadataSchema.fields.head.dataType.asInstanceOf[StructType]
+ val prunedValues = innerStructType.fields.map(f =>
allMetadataValues(f.name))
+ val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq)
+ val metadataRow = InternalRow.fromSeq(Seq(metadataStruct))
+
+ new PartitionReaderWithMetadata(baseReader, dataSchema, metadataSchema,
metadataRow)
+ } else {
+ baseReader
+ }
}
}
+
+private[geopackage] class PartitionReaderWithMetadata(
+ reader: PartitionReader[InternalRow],
+ baseSchema: StructType,
+ metadataSchema: StructType,
+ metadataValues: InternalRow)
+ extends PartitionReader[InternalRow] {
+
+ private val joinedRow = new JoinedRow()
+ private val unsafeProjection =
+ GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map {
case (f, i) =>
+ BoundReference(i, f.dataType, f.nullable)
+ } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) =>
+ BoundReference(baseSchema.length + i, f.dataType, f.nullable)
+ })
+
+ override def next(): Boolean = reader.next()
+
+ override def get(): InternalRow = {
+ unsafeProjection(joinedRow(reader.get(), metadataValues))
+ }
+
+ override def close(): Unit = reader.close()
+}
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
index 768afd7917..edca3d35ff 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
@@ -36,10 +36,14 @@ case class GeoPackageScan(
fileIndex: PartitioningAwareFileIndex,
readDataSchema: StructType,
readPartitionSchema: StructType,
+ metadataSchema: StructType,
options: CaseInsensitiveStringMap,
loadOptions: GeoPackageOptions)
extends FileScan {
+ override def readSchema(): StructType =
+ StructType(readDataSchema.fields ++ readPartitionSchema.fields ++
metadataSchema.fields)
+
override def partitionFilters: Seq[Expression] = {
Seq.empty
}
@@ -54,6 +58,11 @@ case class GeoPackageScan(
val broadcastedConf =
sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
- GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf,
loadOptions, dataSchema)
+ GeoPackagePartitionReaderFactory(
+ sparkSession,
+ broadcastedConf,
+ loadOptions,
+ dataSchema,
+ metadataSchema)
}
}
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
index 829bd9c220..7fdb716f29 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
@@ -36,6 +36,18 @@ class GeoPackageScanBuilder(
userDefinedSchema: Option[StructType] = None)
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+ private var _requiredMetadataSchema: StructType = StructType(Seq.empty)
+
+ override def pruneColumns(requiredSchema: StructType): Unit = {
+ val resolver = sparkSession.sessionState.conf.resolver
+ val metaFields = requiredSchema.fields.filter { field =>
+ !dataSchema.fields.exists(df => resolver(df.name, field.name)) &&
+ !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name,
field.name))
+ }
+ _requiredMetadataSchema = StructType(metaFields)
+ super.pruneColumns(requiredSchema)
+ }
+
override def build(): Scan = {
val fileIndexAdjusted =
if (loadOptions.showMetadata)
@@ -52,6 +64,7 @@ class GeoPackageScanBuilder(
fileIndexAdjusted,
dataSchema,
readPartitionSchema(),
+ _requiredMetadataSchema,
options,
loadOptions)
}
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 85dec8427e..498933de30 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -22,11 +22,12 @@ import org.apache.hadoop.fs.FileStatus
import
org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils,
GeoPackageConnectionManager}
import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions,
MetadataSchema, TableType}
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType,
StructField, StructType, TimestampType}
+import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType,
LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration
@@ -40,7 +41,8 @@ case class GeoPackageTable(
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat],
loadOptions: GeoPackageOptions)
- extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+ extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
+ with SupportsMetadataColumns {
override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
if (loadOptions.showMetadata) {
@@ -74,6 +76,8 @@ case class GeoPackageTable(
"GeoPackage"
}
+ override def metadataColumns(): Array[MetadataColumn] =
GeoPackageTable.fileMetadataColumns
+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
new GeoPackageScanBuilder(
sparkSession,
@@ -87,5 +91,22 @@ case class GeoPackageTable(
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
null
}
+}
+
+object GeoPackageTable {
+
+ private val FILE_METADATA_STRUCT_TYPE: StructType = StructType(
+ Seq(
+ StructField("file_path", StringType, nullable = false),
+ StructField("file_name", StringType, nullable = false),
+ StructField("file_size", LongType, nullable = false),
+ StructField("file_block_start", LongType, nullable = false),
+ StructField("file_block_length", LongType, nullable = false),
+ StructField("file_modification_time", TimestampType, nullable = false)))
+ private[geopackage] val fileMetadataColumns: Array[MetadataColumn] =
Array(new MetadataColumn {
+ override def name: String = "_metadata"
+ override def dataType: DataType = FILE_METADATA_STRUCT_TYPE
+ override def isNullable: Boolean = false
+ })
}
diff --git
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
index 66fa147bc5..f37298661f 100644
---
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
+++
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
@@ -19,7 +19,7 @@
package org.apache.sedona.sql
import io.minio.{MakeBucketArgs, MinioClient, PutObjectArgs}
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.types.{BinaryType, BooleanType, DateType,
DoubleType, IntegerType, StringType, StructField, StructType, TimestampType}
@@ -321,6 +321,78 @@ class GeoPackageReaderTest extends TestBaseScala with
Matchers {
}
}
+ describe("_metadata hidden column support") {
+ it("should expose _metadata struct with all expected fields") {
+ val df = readFeatureData("point1")
+ val metaDf = df.select("_metadata")
+ val metaSchema =
metaDf.schema.fields.head.dataType.asInstanceOf[StructType]
+ val fieldNames = metaSchema.fieldNames.toSet
+ fieldNames should contain("file_path")
+ fieldNames should contain("file_name")
+ fieldNames should contain("file_size")
+ fieldNames should contain("file_block_start")
+ fieldNames should contain("file_block_length")
+ fieldNames should contain("file_modification_time")
+ }
+
+ it("should not include _metadata in select(*)") {
+ val df = readFeatureData("point1")
+ val starCols = df.select("*").columns.toSet
+ starCols should not contain "_metadata"
+ }
+
+ it("should return correct file_path and file_name in _metadata") {
+ val df = readFeatureData("point1")
+ val row = df.select("_metadata.file_path", "_metadata.file_name").head()
+ val filePath = row.getString(0)
+ val fileName = row.getString(1)
+ filePath should endWith("example.gpkg")
+ fileName shouldEqual "example.gpkg"
+ }
+
+ it("should return actual file_size matching the .gpkg file on disk") {
+ val df = readFeatureData("point1")
+ val metaFileSize = df.select("_metadata.file_size").head().getLong(0)
+ val actualFile = new java.io.File(path)
+ metaFileSize shouldEqual actualFile.length()
+ }
+
+ it("should return file_block_start=0 and file_block_length=file_size") {
+ val df = readFeatureData("point1")
+ val row = df
+ .select(
+ "_metadata.file_block_start",
+ "_metadata.file_block_length",
+ "_metadata.file_size")
+ .head()
+ row.getLong(0) shouldEqual 0L
+ row.getLong(1) shouldEqual row.getLong(2)
+ }
+
+ it("should return file_modification_time matching the .gpkg file on disk")
{
+ val df = readFeatureData("point1")
+ val metaModTime =
df.select("_metadata.file_modification_time").head().getTimestamp(0)
+ val actualFile = new java.io.File(path)
+ val expectedModTime = new java.sql.Timestamp(actualFile.lastModified())
+ metaModTime shouldEqual expectedModTime
+ }
+
+ it("should allow filtering on _metadata fields") {
+ val df = readFeatureData("point1")
+ val filtered = df.filter(df("_metadata.file_name") === "example.gpkg")
+ filtered.count() shouldEqual df.count()
+ val empty = df.filter(df("_metadata.file_name") === "nonexistent.gpkg")
+ empty.count() shouldEqual 0
+ }
+
+ it("should select _metadata along with data columns") {
+ val df = readFeatureData("point1")
+ val result = df.select("id", "_metadata.file_name").head()
+ result.getInt(0) shouldEqual 1
+ result.getString(1) shouldEqual "example.gpkg"
+ }
+ }
+
private def readFeatureData(tableName: String): DataFrame = {
sparkSession.read
.format("geopackage")
diff --git
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
index b1d38996b0..0f2e9a87b8 100644
---
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
+++
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
@@ -25,16 +25,20 @@ import
org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Ge
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow}
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory}
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration
case class GeoPackagePartitionReaderFactory(
sparkSession: SparkSession,
broadcastedConf: Broadcast[SerializableConfiguration],
loadOptions: GeoPackageOptions,
- dataSchema: StructType)
+ dataSchema: StructType,
+ metadataSchema: StructType)
extends PartitionReaderFactory {
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
@@ -72,7 +76,7 @@ case class GeoPackagePartitionReaderFactory(
case _ => None
}
- GeoPackagePartitionReader(
+ val baseReader = GeoPackagePartitionReader(
rs = rs,
options = GeoPackageReadOptions(
tableName = loadOptions.tableName,
@@ -84,5 +88,52 @@ case class GeoPackagePartitionReaderFactory(
broadcastedConf = broadcastedConf,
currentTempFile = tempFile,
copying = copied)
+
+ if (metadataSchema.nonEmpty) {
+ val gpkgFile = partitionFiles.head
+ val filePath = gpkgFile.filePath.toString
+ val fileName = new Path(filePath).getName
+
+ val allMetadataValues: Map[String, Any] = Map(
+ "file_path" -> UTF8String.fromString(filePath),
+ "file_name" -> UTF8String.fromString(fileName),
+ "file_size" -> gpkgFile.fileSize,
+ "file_block_start" -> gpkgFile.start,
+ "file_block_length" -> gpkgFile.length,
+ "file_modification_time" -> (gpkgFile.modificationTime * 1000L))
+
+ val innerStructType =
metadataSchema.fields.head.dataType.asInstanceOf[StructType]
+ val prunedValues = innerStructType.fields.map(f =>
allMetadataValues(f.name))
+ val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq)
+ val metadataRow = InternalRow.fromSeq(Seq(metadataStruct))
+
+ new PartitionReaderWithMetadata(baseReader, dataSchema, metadataSchema,
metadataRow)
+ } else {
+ baseReader
+ }
}
}
+
+private[geopackage] class PartitionReaderWithMetadata(
+ reader: PartitionReader[InternalRow],
+ baseSchema: StructType,
+ metadataSchema: StructType,
+ metadataValues: InternalRow)
+ extends PartitionReader[InternalRow] {
+
+ private val joinedRow = new JoinedRow()
+ private val unsafeProjection =
+ GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map {
case (f, i) =>
+ BoundReference(i, f.dataType, f.nullable)
+ } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) =>
+ BoundReference(baseSchema.length + i, f.dataType, f.nullable)
+ })
+
+ override def next(): Boolean = reader.next()
+
+ override def get(): InternalRow = {
+ unsafeProjection(joinedRow(reader.get(), metadataValues))
+ }
+
+ override def close(): Unit = reader.close()
+}
diff --git
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
index 768afd7917..edca3d35ff 100644
---
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
+++
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
@@ -36,10 +36,14 @@ case class GeoPackageScan(
fileIndex: PartitioningAwareFileIndex,
readDataSchema: StructType,
readPartitionSchema: StructType,
+ metadataSchema: StructType,
options: CaseInsensitiveStringMap,
loadOptions: GeoPackageOptions)
extends FileScan {
+ override def readSchema(): StructType =
+ StructType(readDataSchema.fields ++ readPartitionSchema.fields ++
metadataSchema.fields)
+
override def partitionFilters: Seq[Expression] = {
Seq.empty
}
@@ -54,6 +58,11 @@ case class GeoPackageScan(
val broadcastedConf =
sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
- GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf,
loadOptions, dataSchema)
+ GeoPackagePartitionReaderFactory(
+ sparkSession,
+ broadcastedConf,
+ loadOptions,
+ dataSchema,
+ metadataSchema)
}
}
diff --git
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
index 829bd9c220..7fdb716f29 100644
---
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
+++
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
@@ -36,6 +36,18 @@ class GeoPackageScanBuilder(
userDefinedSchema: Option[StructType] = None)
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+ private var _requiredMetadataSchema: StructType = StructType(Seq.empty)
+
+ override def pruneColumns(requiredSchema: StructType): Unit = {
+ val resolver = sparkSession.sessionState.conf.resolver
+ val metaFields = requiredSchema.fields.filter { field =>
+ !dataSchema.fields.exists(df => resolver(df.name, field.name)) &&
+ !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name,
field.name))
+ }
+ _requiredMetadataSchema = StructType(metaFields)
+ super.pruneColumns(requiredSchema)
+ }
+
override def build(): Scan = {
val fileIndexAdjusted =
if (loadOptions.showMetadata)
@@ -52,6 +64,7 @@ class GeoPackageScanBuilder(
fileIndexAdjusted,
dataSchema,
readPartitionSchema(),
+ _requiredMetadataSchema,
options,
loadOptions)
}
diff --git
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 85dec8427e..498933de30 100644
---
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -22,11 +22,12 @@ import org.apache.hadoop.fs.FileStatus
import
org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils,
GeoPackageConnectionManager}
import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions,
MetadataSchema, TableType}
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType,
StructField, StructType, TimestampType}
+import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType,
LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration
@@ -40,7 +41,8 @@ case class GeoPackageTable(
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat],
loadOptions: GeoPackageOptions)
- extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+ extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
+ with SupportsMetadataColumns {
override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
if (loadOptions.showMetadata) {
@@ -74,6 +76,8 @@ case class GeoPackageTable(
"GeoPackage"
}
+ override def metadataColumns(): Array[MetadataColumn] =
GeoPackageTable.fileMetadataColumns
+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
new GeoPackageScanBuilder(
sparkSession,
@@ -87,5 +91,22 @@ case class GeoPackageTable(
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
null
}
+}
+
+object GeoPackageTable {
+
+ private val FILE_METADATA_STRUCT_TYPE: StructType = StructType(
+ Seq(
+ StructField("file_path", StringType, nullable = false),
+ StructField("file_name", StringType, nullable = false),
+ StructField("file_size", LongType, nullable = false),
+ StructField("file_block_start", LongType, nullable = false),
+ StructField("file_block_length", LongType, nullable = false),
+ StructField("file_modification_time", TimestampType, nullable = false)))
+ private[geopackage] val fileMetadataColumns: Array[MetadataColumn] =
Array(new MetadataColumn {
+ override def name: String = "_metadata"
+ override def dataType: DataType = FILE_METADATA_STRUCT_TYPE
+ override def isNullable: Boolean = false
+ })
}
diff --git
a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
index 66fa147bc5..f37298661f 100644
---
a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
+++
b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
@@ -19,7 +19,7 @@
package org.apache.sedona.sql
import io.minio.{MakeBucketArgs, MinioClient, PutObjectArgs}
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.types.{BinaryType, BooleanType, DateType,
DoubleType, IntegerType, StringType, StructField, StructType, TimestampType}
@@ -321,6 +321,78 @@ class GeoPackageReaderTest extends TestBaseScala with
Matchers {
}
}
+ describe("_metadata hidden column support") {
+ it("should expose _metadata struct with all expected fields") {
+ val df = readFeatureData("point1")
+ val metaDf = df.select("_metadata")
+ val metaSchema =
metaDf.schema.fields.head.dataType.asInstanceOf[StructType]
+ val fieldNames = metaSchema.fieldNames.toSet
+ fieldNames should contain("file_path")
+ fieldNames should contain("file_name")
+ fieldNames should contain("file_size")
+ fieldNames should contain("file_block_start")
+ fieldNames should contain("file_block_length")
+ fieldNames should contain("file_modification_time")
+ }
+
+ it("should not include _metadata in select(*)") {
+ val df = readFeatureData("point1")
+ val starCols = df.select("*").columns.toSet
+ starCols should not contain "_metadata"
+ }
+
+ it("should return correct file_path and file_name in _metadata") {
+ val df = readFeatureData("point1")
+ val row = df.select("_metadata.file_path", "_metadata.file_name").head()
+ val filePath = row.getString(0)
+ val fileName = row.getString(1)
+ filePath should endWith("example.gpkg")
+ fileName shouldEqual "example.gpkg"
+ }
+
+ it("should return actual file_size matching the .gpkg file on disk") {
+ val df = readFeatureData("point1")
+ val metaFileSize = df.select("_metadata.file_size").head().getLong(0)
+ val actualFile = new java.io.File(path)
+ metaFileSize shouldEqual actualFile.length()
+ }
+
+ it("should return file_block_start=0 and file_block_length=file_size") {
+ val df = readFeatureData("point1")
+ val row = df
+ .select(
+ "_metadata.file_block_start",
+ "_metadata.file_block_length",
+ "_metadata.file_size")
+ .head()
+ row.getLong(0) shouldEqual 0L
+ row.getLong(1) shouldEqual row.getLong(2)
+ }
+
+ it("should return file_modification_time matching the .gpkg file on disk")
{
+ val df = readFeatureData("point1")
+ val metaModTime =
df.select("_metadata.file_modification_time").head().getTimestamp(0)
+ val actualFile = new java.io.File(path)
+ val expectedModTime = new java.sql.Timestamp(actualFile.lastModified())
+ metaModTime shouldEqual expectedModTime
+ }
+
+ it("should allow filtering on _metadata fields") {
+ val df = readFeatureData("point1")
+ val filtered = df.filter(df("_metadata.file_name") === "example.gpkg")
+ filtered.count() shouldEqual df.count()
+ val empty = df.filter(df("_metadata.file_name") === "nonexistent.gpkg")
+ empty.count() shouldEqual 0
+ }
+
+ it("should select _metadata along with data columns") {
+ val df = readFeatureData("point1")
+ val result = df.select("id", "_metadata.file_name").head()
+ result.getInt(0) shouldEqual 1
+ result.getString(1) shouldEqual "example.gpkg"
+ }
+ }
+
private def readFeatureData(tableName: String): DataFrame = {
sparkSession.read
.format("geopackage")
diff --git
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
index b1d38996b0..0f2e9a87b8 100644
---
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
+++
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackagePartitionReaderFactory.scala
@@ -25,16 +25,20 @@ import
org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Ge
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow}
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory}
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration
case class GeoPackagePartitionReaderFactory(
sparkSession: SparkSession,
broadcastedConf: Broadcast[SerializableConfiguration],
loadOptions: GeoPackageOptions,
- dataSchema: StructType)
+ dataSchema: StructType,
+ metadataSchema: StructType)
extends PartitionReaderFactory {
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
@@ -72,7 +76,7 @@ case class GeoPackagePartitionReaderFactory(
case _ => None
}
- GeoPackagePartitionReader(
+ val baseReader = GeoPackagePartitionReader(
rs = rs,
options = GeoPackageReadOptions(
tableName = loadOptions.tableName,
@@ -84,5 +88,52 @@ case class GeoPackagePartitionReaderFactory(
broadcastedConf = broadcastedConf,
currentTempFile = tempFile,
copying = copied)
+
+ if (metadataSchema.nonEmpty) {
+ val gpkgFile = partitionFiles.head
+ val filePath = gpkgFile.filePath.toString
+ val fileName = new Path(filePath).getName
+
+ val allMetadataValues: Map[String, Any] = Map(
+ "file_path" -> UTF8String.fromString(filePath),
+ "file_name" -> UTF8String.fromString(fileName),
+ "file_size" -> gpkgFile.fileSize,
+ "file_block_start" -> gpkgFile.start,
+ "file_block_length" -> gpkgFile.length,
+ "file_modification_time" -> (gpkgFile.modificationTime * 1000L))
+
+ val innerStructType =
metadataSchema.fields.head.dataType.asInstanceOf[StructType]
+ val prunedValues = innerStructType.fields.map(f =>
allMetadataValues(f.name))
+ val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq)
+ val metadataRow = InternalRow.fromSeq(Seq(metadataStruct))
+
+ new PartitionReaderWithMetadata(baseReader, dataSchema, metadataSchema,
metadataRow)
+ } else {
+ baseReader
+ }
}
}
+
+private[geopackage] class PartitionReaderWithMetadata(
+ reader: PartitionReader[InternalRow],
+ baseSchema: StructType,
+ metadataSchema: StructType,
+ metadataValues: InternalRow)
+ extends PartitionReader[InternalRow] {
+
+ private val joinedRow = new JoinedRow()
+ private val unsafeProjection =
+ GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map {
case (f, i) =>
+ BoundReference(i, f.dataType, f.nullable)
+ } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) =>
+ BoundReference(baseSchema.length + i, f.dataType, f.nullable)
+ })
+
+ override def next(): Boolean = reader.next()
+
+ override def get(): InternalRow = {
+ unsafeProjection(joinedRow(reader.get(), metadataValues))
+ }
+
+ override def close(): Unit = reader.close()
+}
diff --git
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
index 768afd7917..edca3d35ff 100644
---
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
+++
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScan.scala
@@ -36,10 +36,14 @@ case class GeoPackageScan(
fileIndex: PartitioningAwareFileIndex,
readDataSchema: StructType,
readPartitionSchema: StructType,
+ metadataSchema: StructType,
options: CaseInsensitiveStringMap,
loadOptions: GeoPackageOptions)
extends FileScan {
+ override def readSchema(): StructType =
+ StructType(readDataSchema.fields ++ readPartitionSchema.fields ++
metadataSchema.fields)
+
override def partitionFilters: Seq[Expression] = {
Seq.empty
}
@@ -54,6 +58,11 @@ case class GeoPackageScan(
val broadcastedConf =
sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
- GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf,
loadOptions, dataSchema)
+ GeoPackagePartitionReaderFactory(
+ sparkSession,
+ broadcastedConf,
+ loadOptions,
+ dataSchema,
+ metadataSchema)
}
}
diff --git
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
index 829bd9c220..7fdb716f29 100644
---
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
+++
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageScanBuilder.scala
@@ -36,6 +36,18 @@ class GeoPackageScanBuilder(
userDefinedSchema: Option[StructType] = None)
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+ private var _requiredMetadataSchema: StructType = StructType(Seq.empty)
+
+ override def pruneColumns(requiredSchema: StructType): Unit = {
+ val resolver = sparkSession.sessionState.conf.resolver
+ val metaFields = requiredSchema.fields.filter { field =>
+ !dataSchema.fields.exists(df => resolver(df.name, field.name)) &&
+ !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name,
field.name))
+ }
+ _requiredMetadataSchema = StructType(metaFields)
+ super.pruneColumns(requiredSchema)
+ }
+
override def build(): Scan = {
val fileIndexAdjusted =
if (loadOptions.showMetadata)
@@ -52,6 +64,7 @@ class GeoPackageScanBuilder(
fileIndexAdjusted,
dataSchema,
readPartitionSchema(),
+ _requiredMetadataSchema,
options,
loadOptions)
}
diff --git
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 85dec8427e..498933de30 100644
---
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -22,11 +22,12 @@ import org.apache.hadoop.fs.FileStatus
import
org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils,
GeoPackageConnectionManager}
import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions,
MetadataSchema, TableType}
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType,
StructField, StructType, TimestampType}
+import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType,
LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration
@@ -40,7 +41,8 @@ case class GeoPackageTable(
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat],
loadOptions: GeoPackageOptions)
- extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+ extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
+ with SupportsMetadataColumns {
override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
if (loadOptions.showMetadata) {
@@ -74,6 +76,8 @@ case class GeoPackageTable(
"GeoPackage"
}
+ override def metadataColumns(): Array[MetadataColumn] =
GeoPackageTable.fileMetadataColumns
+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
new GeoPackageScanBuilder(
sparkSession,
@@ -87,5 +91,22 @@ case class GeoPackageTable(
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
null
}
+}
+
+object GeoPackageTable {
+
+ private val FILE_METADATA_STRUCT_TYPE: StructType = StructType(
+ Seq(
+ StructField("file_path", StringType, nullable = false),
+ StructField("file_name", StringType, nullable = false),
+ StructField("file_size", LongType, nullable = false),
+ StructField("file_block_start", LongType, nullable = false),
+ StructField("file_block_length", LongType, nullable = false),
+ StructField("file_modification_time", TimestampType, nullable = false)))
+ private[geopackage] val fileMetadataColumns: Array[MetadataColumn] =
Array(new MetadataColumn {
+ override def name: String = "_metadata"
+ override def dataType: DataType = FILE_METADATA_STRUCT_TYPE
+ override def isNullable: Boolean = false
+ })
}
diff --git
a/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
b/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
index 66fa147bc5..f37298661f 100644
---
a/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
+++
b/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/GeoPackageReaderTest.scala
@@ -19,7 +19,7 @@
package org.apache.sedona.sql
import io.minio.{MakeBucketArgs, MinioClient, PutObjectArgs}
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.types.{BinaryType, BooleanType, DateType,
DoubleType, IntegerType, StringType, StructField, StructType, TimestampType}
@@ -321,6 +321,78 @@ class GeoPackageReaderTest extends TestBaseScala with
Matchers {
}
}
+ describe("_metadata hidden column support") {
+ it("should expose _metadata struct with all expected fields") {
+ val df = readFeatureData("point1")
+ val metaDf = df.select("_metadata")
+ val metaSchema =
metaDf.schema.fields.head.dataType.asInstanceOf[StructType]
+ val fieldNames = metaSchema.fieldNames.toSet
+ fieldNames should contain("file_path")
+ fieldNames should contain("file_name")
+ fieldNames should contain("file_size")
+ fieldNames should contain("file_block_start")
+ fieldNames should contain("file_block_length")
+ fieldNames should contain("file_modification_time")
+ }
+
+ it("should not include _metadata in select(*)") {
+ val df = readFeatureData("point1")
+ val starCols = df.select("*").columns.toSet
+ starCols should not contain "_metadata"
+ }
+
+ it("should return correct file_path and file_name in _metadata") {
+ val df = readFeatureData("point1")
+ val row = df.select("_metadata.file_path", "_metadata.file_name").head()
+ val filePath = row.getString(0)
+ val fileName = row.getString(1)
+ filePath should endWith("example.gpkg")
+ fileName shouldEqual "example.gpkg"
+ }
+
+ it("should return actual file_size matching the .gpkg file on disk") {
+ val df = readFeatureData("point1")
+ val metaFileSize = df.select("_metadata.file_size").head().getLong(0)
+ val actualFile = new java.io.File(path)
+ metaFileSize shouldEqual actualFile.length()
+ }
+
+ it("should return file_block_start=0 and file_block_length=file_size") {
+ val df = readFeatureData("point1")
+ val row = df
+ .select(
+ "_metadata.file_block_start",
+ "_metadata.file_block_length",
+ "_metadata.file_size")
+ .head()
+ row.getLong(0) shouldEqual 0L
+ row.getLong(1) shouldEqual row.getLong(2)
+ }
+
+ it("should return file_modification_time matching the .gpkg file on disk")
{
+ val df = readFeatureData("point1")
+ val metaModTime =
df.select("_metadata.file_modification_time").head().getTimestamp(0)
+ val actualFile = new java.io.File(path)
+ val expectedModTime = new java.sql.Timestamp(actualFile.lastModified())
+ metaModTime shouldEqual expectedModTime
+ }
+
+ it("should allow filtering on _metadata fields") {
+ val df = readFeatureData("point1")
+ val filtered = df.filter(df("_metadata.file_name") === "example.gpkg")
+ filtered.count() shouldEqual df.count()
+ val empty = df.filter(df("_metadata.file_name") === "nonexistent.gpkg")
+ empty.count() shouldEqual 0
+ }
+
+ it("should select _metadata along with data columns") {
+ val df = readFeatureData("point1")
+ val result = df.select("id", "_metadata.file_name").head()
+ result.getInt(0) shouldEqual 1
+ result.getString(1) shouldEqual "example.gpkg"
+ }
+ }
+
private def readFeatureData(tableName: String): DataFrame = {
sparkSession.read
.format("geopackage")