This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 0e52724a1e [SEDONA-729] Add _metadata hidden column support for
shapefile DataSource V2 reader (#2653)
0e52724a1e is described below
commit 0e52724a1ed1b0c255a7d5cb4ff74d5fcbd437c0
Author: Jia Yu <[email protected]>
AuthorDate: Sun Feb 15 01:05:46 2026 -0700
[SEDONA-729] Add _metadata hidden column support for shapefile DataSource
V2 reader (#2653)
---
.../ShapefilePartitionReaderFactory.scala | 90 +++++++++-
.../sql/datasources/shapefile/ShapefileScan.scala | 11 ++
.../shapefile/ShapefileScanBuilder.scala | 23 +++
.../sql/datasources/shapefile/ShapefileTable.scala | 55 +++++-
.../org/apache/sedona/sql/ShapefileTests.scala | 177 +++++++++++++++++++-
.../ShapefilePartitionReaderFactory.scala | 90 +++++++++-
.../sql/datasources/shapefile/ShapefileScan.scala | 11 ++
.../shapefile/ShapefileScanBuilder.scala | 23 +++
.../sql/datasources/shapefile/ShapefileTable.scala | 55 +++++-
.../org/apache/sedona/sql/ShapefileTests.scala | 185 ++++++++++++++++++++-
.../ShapefilePartitionReaderFactory.scala | 90 +++++++++-
.../sql/datasources/shapefile/ShapefileScan.scala | 11 ++
.../shapefile/ShapefileScanBuilder.scala | 23 +++
.../sql/datasources/shapefile/ShapefileTable.scala | 55 +++++-
.../org/apache/sedona/sql/ShapefileTests.scala | 185 ++++++++++++++++++++-
.../ShapefilePartitionReaderFactory.scala | 90 +++++++++-
.../sql/datasources/shapefile/ShapefileScan.scala | 11 ++
.../shapefile/ShapefileScanBuilder.scala | 23 +++
.../sql/datasources/shapefile/ShapefileTable.scala | 55 +++++-
.../org/apache/sedona/sql/ShapefileTests.scala | 185 ++++++++++++++++++++-
20 files changed, 1416 insertions(+), 32 deletions(-)
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
index 5a28af6d66..79c0638bd6 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
@@ -18,8 +18,11 @@
*/
package org.apache.sedona.sql.datasources.shapefile
+import org.apache.hadoop.fs.Path
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow}
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.connector.read.PartitionReaderFactory
@@ -28,14 +31,19 @@ import
org.apache.spark.sql.execution.datasources.v2.PartitionReaderWithPartitio
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration
+import java.util.Locale
+
case class ShapefilePartitionReaderFactory(
sqlConf: SQLConf,
broadcastedConf: Broadcast[SerializableConfiguration],
dataSchema: StructType,
readDataSchema: StructType,
partitionSchema: StructType,
+ /** The metadata fields requested by the query (e.g., fields from
`_metadata`). */
+ metadataSchema: StructType,
options: ShapefileReadOptions,
filters: Seq[Filter])
extends PartitionReaderFactory {
@@ -48,11 +56,51 @@ case class ShapefilePartitionReaderFactory(
partitionedFiles,
readDataSchema,
options)
- new PartitionReaderWithPartitionValues(
+ val withPartitionValues = new PartitionReaderWithPartitionValues(
fileReader,
readDataSchema,
partitionSchema,
partitionedFiles.head.partitionValues)
+
+ if (metadataSchema.nonEmpty) {
+ // Build metadata values from the .shp file's partition information.
+ // We use the .shp file because it is the primary shapefile component
and its path
+ // is what users would expect to see in _metadata.file_path /
_metadata.file_name.
+ val shpFile = partitionedFiles
+
.find(_.filePath.toPath.getName.toLowerCase(Locale.ROOT).endsWith(".shp"))
+ .getOrElse(partitionedFiles.head)
+ val filePath = shpFile.filePath.toString
+ val fileName = new Path(filePath).getName
+
+ // Complete map of all metadata field values keyed by field name.
+ // The modificationTime from PartitionedFile is in milliseconds but
Spark's
+ // TimestampType uses microseconds, so we multiply by 1000.
+ val allMetadataValues: Map[String, Any] = Map(
+ "file_path" -> UTF8String.fromString(filePath),
+ "file_name" -> UTF8String.fromString(fileName),
+ "file_size" -> shpFile.fileSize,
+ "file_block_start" -> shpFile.start,
+ "file_block_length" -> shpFile.length,
+ "file_modification_time" -> (shpFile.modificationTime * 1000L))
+
+ // The metadataSchema may be pruned by Spark's column pruning (e.g.,
when the query
+ // only selects `_metadata.file_name`). We must construct the inner
struct to match
+ // the pruned schema exactly, otherwise field ordinals will be
misaligned.
+ val innerStructType =
metadataSchema.fields.head.dataType.asInstanceOf[StructType]
+ val prunedValues = innerStructType.fields.map(f =>
allMetadataValues(f.name))
+ val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq)
+
+ // Wrap the struct in an outer row since _metadata is a single
StructType column
+ val metadataRow = InternalRow.fromSeq(Seq(metadataStruct))
+ val baseSchema = StructType(readDataSchema.fields ++
partitionSchema.fields)
+ new PartitionReaderWithMetadata(
+ withPartitionValues,
+ baseSchema,
+ metadataSchema,
+ metadataRow)
+ } else {
+ withPartitionValues
+ }
}
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
@@ -64,3 +112,43 @@ case class ShapefilePartitionReaderFactory(
}
}
}
+
+/**
+ * Wraps a partition reader to append metadata column values to each row. This
follows the same
+ * pattern as [[PartitionReaderWithPartitionValues]] but for metadata columns:
it uses a
+ * [[JoinedRow]] to concatenate the base row (data + partition values) with
the metadata row, then
+ * projects the combined row through an
+ * [[org.apache.spark.sql.catalyst.expressions.UnsafeProjection]] to produce a
compact unsafe row.
+ *
+ * @param reader
+ * the underlying reader that produces data + partition value rows
+ * @param baseSchema
+ * the combined schema of data columns and partition columns
+ * @param metadataSchema
+ * the schema of the metadata columns being appended
+ * @param metadataValues
+ * the constant metadata values to append to every row
+ */
+private[shapefile] class PartitionReaderWithMetadata(
+ reader: PartitionReader[InternalRow],
+ baseSchema: StructType,
+ metadataSchema: StructType,
+ metadataValues: InternalRow)
+ extends PartitionReader[InternalRow] {
+
+ private val joinedRow = new JoinedRow()
+ private val unsafeProjection =
+ GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map {
case (f, i) =>
+ BoundReference(i, f.dataType, f.nullable)
+ } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) =>
+ BoundReference(baseSchema.length + i, f.dataType, f.nullable)
+ })
+
+ override def next(): Boolean = reader.next()
+
+ override def get(): InternalRow = {
+ unsafeProjection(joinedRow(reader.get(), metadataValues))
+ }
+
+ override def close(): Unit = reader.close()
+}
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
index afdface894..3f6a9224aa 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
@@ -44,12 +44,22 @@ case class ShapefileScan(
dataSchema: StructType,
readDataSchema: StructType,
readPartitionSchema: StructType,
+ /** The metadata fields requested by the query (e.g., fields from
`_metadata`). */
+ metadataSchema: StructType,
options: CaseInsensitiveStringMap,
pushedFilters: Array[Filter],
partitionFilters: Seq[Expression] = Seq.empty,
dataFilters: Seq[Expression] = Seq.empty)
extends FileScan {
+ /**
+ * Returns the complete read schema including data columns, partition
columns, and any requested
+ * metadata columns. Metadata columns are appended last so the reader
factory can construct a
+ * [[JoinedRow]] that appends metadata values after data and partition
values.
+ */
+ override def readSchema(): StructType =
+ StructType(readDataSchema.fields ++ readPartitionSchema.fields ++
metadataSchema.fields)
+
override def createReaderFactory(): PartitionReaderFactory = {
val caseSensitiveMap = options.asScala.toMap
val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
@@ -61,6 +71,7 @@ case class ShapefileScan(
dataSchema,
readDataSchema,
readPartitionSchema,
+ metadataSchema,
ShapefileReadOptions.parse(options),
pushedFilters)
}
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
index e5135e381d..48b5e45d53 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
@@ -33,6 +33,28 @@ case class ShapefileScanBuilder(
options: CaseInsensitiveStringMap)
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+ /**
+ * Tracks any metadata fields (e.g., from `_metadata`) requested in the
query. Populated by
+ * [[pruneColumns]] when Spark pushes down column projections.
+ */
+ private var _requiredMetadataSchema: StructType = StructType(Seq.empty)
+
+ /**
+ * Intercepts Spark's column pruning to separate metadata columns from
data/partition columns.
+ * Fields in [[requiredSchema]] that do not belong to the data schema or
partition schema are
+ * assumed to be metadata fields (e.g., `_metadata`). These are captured in
+ * [[_requiredMetadataSchema]] so the scan can include them in the output.
+ */
+ override def pruneColumns(requiredSchema: StructType): Unit = {
+ val resolver = sparkSession.sessionState.conf.resolver
+ val metaFields = requiredSchema.fields.filter { field =>
+ !dataSchema.fields.exists(df => resolver(df.name, field.name)) &&
+ !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name,
field.name))
+ }
+ _requiredMetadataSchema = StructType(metaFields)
+ super.pruneColumns(requiredSchema)
+ }
+
override def build(): Scan = {
ShapefileScan(
sparkSession,
@@ -40,6 +62,7 @@ case class ShapefileScanBuilder(
dataSchema,
readDataSchema(),
readPartitionSchema(),
+ _requiredMetadataSchema,
options,
pushedDataFilters,
partitionFilters,
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 7db6bb8d1f..1f4bd093b9 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -21,19 +21,27 @@ package org.apache.sedona.sql.datasources.shapefile
import org.apache.hadoop.fs.FileStatus
import
org.apache.sedona.core.formatMapper.shapefileParser.parseUtils.dbf.DbfParseUtil
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.connector.catalog.TableCapability
+import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema,
fieldDescriptorsToSchema, mergeSchemas}
import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, LongType, StringType,
StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration
import java.util.Locale
import scala.collection.JavaConverters._
+/**
+ * A Spark DataSource V2 table implementation for reading Shapefiles.
+ *
+ * Extends [[FileTable]] to leverage Spark's file-based scan infrastructure
and implements
+ * [[SupportsMetadataColumns]] to expose hidden metadata columns (e.g.,
`_metadata`) that provide
+ * file-level information such as path, name, size, and modification time.
These metadata columns
+ * are not part of the user-visible schema but can be explicitly selected in
queries.
+ */
case class ShapefileTable(
name: String,
sparkSession: SparkSession,
@@ -41,7 +49,8 @@ case class ShapefileTable(
paths: Seq[String],
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat])
- extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+ extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
+ with SupportsMetadataColumns {
override def formatName: String = "Shapefile"
@@ -95,9 +104,49 @@ case class ShapefileTable(
}
}
+ /** Returns the metadata columns that this table exposes as hidden columns.
*/
+ override def metadataColumns(): Array[MetadataColumn] =
ShapefileTable.fileMetadataColumns
+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
ShapefileScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
}
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = null
}
+
+object ShapefileTable {
+
+ /**
+ * Schema of the `_metadata` struct column exposed by
[[SupportsMetadataColumns]]. Each field
+ * provides file-level information about the source shapefile:
+ *
+ * - `file_path`: The fully qualified path of the `.shp` file (e.g.,
+ * `hdfs://host/data/file.shp`).
+ * - `file_name`: The name of the `.shp` file without directory components
(e.g., `file.shp`).
+ * - `file_size`: The total size of the `.shp` file in bytes.
+ * - `file_block_start`: The byte offset within the file where this
partition's data begins.
+ * For non-splittable formats this is typically 0.
+ * - `file_block_length`: The number of bytes in this partition's data
block. For
+ * non-splittable formats this equals the file size.
+ * - `file_modification_time`: The last modification timestamp of the
`.shp` file.
+ */
+ private val FILE_METADATA_STRUCT_TYPE: StructType = StructType(
+ Seq(
+ StructField("file_path", StringType, nullable = false),
+ StructField("file_name", StringType, nullable = false),
+ StructField("file_size", LongType, nullable = false),
+ StructField("file_block_start", LongType, nullable = false),
+ StructField("file_block_length", LongType, nullable = false),
+ StructField("file_modification_time", TimestampType, nullable = false)))
+
+ /**
+ * The single metadata column `_metadata` exposed to Spark's catalog. This
hidden column can be
+ * selected in queries (e.g., `SELECT _metadata.file_name FROM
shapefile.`...``) but does not
+ * appear in `SELECT *`.
+ */
+ private[shapefile] val fileMetadataColumns: Array[MetadataColumn] =
Array(new MetadataColumn {
+ override def name: String = "_metadata"
+ override def dataType: DataType = FILE_METADATA_STRUCT_TYPE
+ override def isNullable: Boolean = false
+ })
+}
diff --git
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
index c2e88c469b..96071b8036 100644
--- a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
+++ b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
@@ -20,7 +20,7 @@ package org.apache.sedona.sql
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
-import org.apache.spark.sql.types.{DateType, DecimalType, LongType,
StringType, StructField, StructType}
+import org.apache.spark.sql.types.{DateType, DecimalType, LongType,
StringType, StructField, StructType, TimestampType}
import org.locationtech.jts.geom.{Geometry, MultiPolygon, Point, Polygon}
import org.locationtech.jts.io.{WKTReader, WKTWriter}
import org.scalatest.BeforeAndAfterAll
@@ -768,5 +768,180 @@ class ShapefileTests extends TestBaseScala with
BeforeAndAfterAll {
}
}
}
+
+ it("should expose _metadata struct with all expected fields") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val metaDf = df.select("_metadata")
+ val metaSchema =
metaDf.schema("_metadata").dataType.asInstanceOf[StructType]
+ val expectedFields =
+ Seq(
+ "file_path",
+ "file_name",
+ "file_size",
+ "file_block_start",
+ "file_block_length",
+ "file_modification_time")
+ assert(metaSchema.fieldNames.toSeq == expectedFields)
+ assert(metaSchema("file_path").dataType == StringType)
+ assert(metaSchema("file_name").dataType == StringType)
+ assert(metaSchema("file_size").dataType == LongType)
+ assert(metaSchema("file_block_start").dataType == LongType)
+ assert(metaSchema("file_block_length").dataType == LongType)
+ assert(metaSchema("file_modification_time").dataType == TimestampType)
+ }
+
+ it("should not include _metadata in select(*)") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val columns = df.columns
+ assert(!columns.contains("_metadata"))
+ }
+
+ it("should return correct file_path and file_name in _metadata") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val metaRows = df.select("_metadata.file_path",
"_metadata.file_name").distinct().collect()
+ assert(metaRows.length == 1)
+ val filePath = metaRows.head.getString(0)
+ val fileName = metaRows.head.getString(1)
+ assert(filePath.endsWith("gis_osm_pois_free_1.shp"))
+ assert(fileName == "gis_osm_pois_free_1.shp")
+ }
+
+ it("should return actual file_size matching the .shp file on disk") {
+ val shpFile =
+ new File(resourceFolder +
"shapefiles/gis_osm_pois_free_1/gis_osm_pois_free_1.shp")
+ val expectedSize = shpFile.length()
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val sizes = df.select("_metadata.file_size").distinct().collect()
+ assert(sizes.length == 1)
+ assert(sizes.head.getLong(0) == expectedSize)
+ }
+
+ it(
+ "should return file_block_start=0 and file_block_length=file_size for
non-splittable shapefiles") {
+ val shpFile =
+ new File(resourceFolder +
"shapefiles/gis_osm_pois_free_1/gis_osm_pois_free_1.shp")
+ val expectedSize = shpFile.length()
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val rows = df
+ .select("_metadata.file_block_start", "_metadata.file_block_length")
+ .distinct()
+ .collect()
+ assert(rows.length == 1)
+ assert(rows.head.getLong(0) == 0L) // file_block_start
+ assert(rows.head.getLong(1) == expectedSize) // file_block_length
+ }
+
+ it("should return file_modification_time matching the .shp file on disk") {
+ val shpFile =
+ new File(resourceFolder +
"shapefiles/gis_osm_pois_free_1/gis_osm_pois_free_1.shp")
+ // File.lastModified() returns milliseconds, Spark TimestampType stores
microseconds
+ val expectedModTimeMs = shpFile.lastModified()
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val times =
+ df.select("_metadata.file_modification_time").distinct().collect()
+ assert(times.length == 1)
+ val modTime = times.head.getTimestamp(0)
+ assert(modTime != null)
+ // Timestamp.getTime() returns milliseconds
+ assert(modTime.getTime == expectedModTimeMs)
+ }
+
+ it("should return correct metadata values per file when reading multiple
shapefiles") {
+ val map1Shp =
+ new File(resourceFolder + "shapefiles/multipleshapefiles/map1.shp")
+ val map2Shp =
+ new File(resourceFolder + "shapefiles/multipleshapefiles/map2.shp")
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/multipleshapefiles")
+ val metaRows = df
+ .select(
+ "_metadata.file_name",
+ "_metadata.file_size",
+ "_metadata.file_block_start",
+ "_metadata.file_block_length")
+ .distinct()
+ .collect()
+ assert(metaRows.length == 2)
+ val byName = metaRows.map(r => r.getString(0) -> r).toMap
+ // map1.shp
+ assert(byName("map1.shp").getLong(1) == map1Shp.length())
+ assert(byName("map1.shp").getLong(2) == 0L)
+ assert(byName("map1.shp").getLong(3) == map1Shp.length())
+ // map2.shp
+ assert(byName("map2.shp").getLong(1) == map2Shp.length())
+ assert(byName("map2.shp").getLong(2) == 0L)
+ assert(byName("map2.shp").getLong(3) == map2Shp.length())
+ }
+
+ it("should allow filtering on _metadata fields") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/multipleshapefiles")
+ val totalCount = df.count()
+ val map1Df = df.filter(df("_metadata.file_name") === "map1.shp")
+ val map2Df = df.filter(df("_metadata.file_name") === "map2.shp")
+ assert(map1Df.count() > 0)
+ assert(map2Df.count() > 0)
+ assert(map1Df.count() + map2Df.count() == totalCount)
+ }
+
+ it("should select _metadata along with data columns") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val result = df.select("osm_id", "_metadata.file_name").collect()
+ assert(result.length == 12873)
+ result.foreach { row =>
+ assert(row.getString(0).nonEmpty)
+ assert(row.getString(1) == "gis_osm_pois_free_1.shp")
+ }
+ }
+
+ it("should return correct metadata for each file in multi-shapefile
directory") {
+ val dt1Shp = new File(resourceFolder +
"shapefiles/datatypes/datatypes1.shp")
+ val dt2Shp = new File(resourceFolder +
"shapefiles/datatypes/datatypes2.shp")
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/datatypes")
+ val result = df
+ .select(
+ "_metadata.file_path",
+ "_metadata.file_name",
+ "_metadata.file_size",
+ "_metadata.file_block_start",
+ "_metadata.file_block_length",
+ "_metadata.file_modification_time")
+ .distinct()
+ .collect()
+ assert(result.length == 2)
+ val byName = result.map(r => r.getString(1) -> r).toMap
+ // datatypes1.shp
+ val r1 = byName("datatypes1.shp")
+ assert(r1.getString(0).endsWith("datatypes1.shp"))
+ assert(r1.getLong(2) == dt1Shp.length())
+ assert(r1.getLong(3) == 0L)
+ assert(r1.getLong(4) == dt1Shp.length())
+ assert(r1.getTimestamp(5).getTime == dt1Shp.lastModified())
+ // datatypes2.shp
+ val r2 = byName("datatypes2.shp")
+ assert(r2.getString(0).endsWith("datatypes2.shp"))
+ assert(r2.getLong(2) == dt2Shp.length())
+ assert(r2.getLong(3) == 0L)
+ assert(r2.getLong(4) == dt2Shp.length())
+ assert(r2.getTimestamp(5).getTime == dt2Shp.lastModified())
+ }
}
}
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
index 5a28af6d66..79c0638bd6 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
@@ -18,8 +18,11 @@
*/
package org.apache.sedona.sql.datasources.shapefile
+import org.apache.hadoop.fs.Path
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow}
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.connector.read.PartitionReaderFactory
@@ -28,14 +31,19 @@ import
org.apache.spark.sql.execution.datasources.v2.PartitionReaderWithPartitio
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration
+import java.util.Locale
+
case class ShapefilePartitionReaderFactory(
sqlConf: SQLConf,
broadcastedConf: Broadcast[SerializableConfiguration],
dataSchema: StructType,
readDataSchema: StructType,
partitionSchema: StructType,
+ /** The metadata fields requested by the query (e.g., fields from
`_metadata`). */
+ metadataSchema: StructType,
options: ShapefileReadOptions,
filters: Seq[Filter])
extends PartitionReaderFactory {
@@ -48,11 +56,51 @@ case class ShapefilePartitionReaderFactory(
partitionedFiles,
readDataSchema,
options)
- new PartitionReaderWithPartitionValues(
+ val withPartitionValues = new PartitionReaderWithPartitionValues(
fileReader,
readDataSchema,
partitionSchema,
partitionedFiles.head.partitionValues)
+
+ if (metadataSchema.nonEmpty) {
+ // Build metadata values from the .shp file's partition information.
+ // We use the .shp file because it is the primary shapefile component
and its path
+ // is what users would expect to see in _metadata.file_path /
_metadata.file_name.
+ val shpFile = partitionedFiles
+
.find(_.filePath.toPath.getName.toLowerCase(Locale.ROOT).endsWith(".shp"))
+ .getOrElse(partitionedFiles.head)
+ val filePath = shpFile.filePath.toString
+ val fileName = new Path(filePath).getName
+
+ // Complete map of all metadata field values keyed by field name.
+ // The modificationTime from PartitionedFile is in milliseconds but
Spark's
+ // TimestampType uses microseconds, so we multiply by 1000.
+ val allMetadataValues: Map[String, Any] = Map(
+ "file_path" -> UTF8String.fromString(filePath),
+ "file_name" -> UTF8String.fromString(fileName),
+ "file_size" -> shpFile.fileSize,
+ "file_block_start" -> shpFile.start,
+ "file_block_length" -> shpFile.length,
+ "file_modification_time" -> (shpFile.modificationTime * 1000L))
+
+ // The metadataSchema may be pruned by Spark's column pruning (e.g.,
when the query
+ // only selects `_metadata.file_name`). We must construct the inner
struct to match
+ // the pruned schema exactly, otherwise field ordinals will be
misaligned.
+ val innerStructType =
metadataSchema.fields.head.dataType.asInstanceOf[StructType]
+ val prunedValues = innerStructType.fields.map(f =>
allMetadataValues(f.name))
+ val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq)
+
+ // Wrap the struct in an outer row since _metadata is a single
StructType column
+ val metadataRow = InternalRow.fromSeq(Seq(metadataStruct))
+ val baseSchema = StructType(readDataSchema.fields ++
partitionSchema.fields)
+ new PartitionReaderWithMetadata(
+ withPartitionValues,
+ baseSchema,
+ metadataSchema,
+ metadataRow)
+ } else {
+ withPartitionValues
+ }
}
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
@@ -64,3 +112,43 @@ case class ShapefilePartitionReaderFactory(
}
}
}
+
+/**
+ * Wraps a partition reader to append metadata column values to each row. This
follows the same
+ * pattern as [[PartitionReaderWithPartitionValues]] but for metadata columns:
it uses a
+ * [[JoinedRow]] to concatenate the base row (data + partition values) with
the metadata row, then
+ * projects the combined row through an
+ * [[org.apache.spark.sql.catalyst.expressions.UnsafeProjection]] to produce a
compact unsafe row.
+ *
+ * @param reader
+ * the underlying reader that produces data + partition value rows
+ * @param baseSchema
+ * the combined schema of data columns and partition columns
+ * @param metadataSchema
+ * the schema of the metadata columns being appended
+ * @param metadataValues
+ * the constant metadata values to append to every row
+ */
+private[shapefile] class PartitionReaderWithMetadata(
+ reader: PartitionReader[InternalRow],
+ baseSchema: StructType,
+ metadataSchema: StructType,
+ metadataValues: InternalRow)
+ extends PartitionReader[InternalRow] {
+
+ private val joinedRow = new JoinedRow()
+ private val unsafeProjection =
+ GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map {
case (f, i) =>
+ BoundReference(i, f.dataType, f.nullable)
+ } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) =>
+ BoundReference(baseSchema.length + i, f.dataType, f.nullable)
+ })
+
+ override def next(): Boolean = reader.next()
+
+ override def get(): InternalRow = {
+ unsafeProjection(joinedRow(reader.get(), metadataValues))
+ }
+
+ override def close(): Unit = reader.close()
+}
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
index afdface894..3f6a9224aa 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
@@ -44,12 +44,22 @@ case class ShapefileScan(
dataSchema: StructType,
readDataSchema: StructType,
readPartitionSchema: StructType,
+ /** The metadata fields requested by the query (e.g., fields from
`_metadata`). */
+ metadataSchema: StructType,
options: CaseInsensitiveStringMap,
pushedFilters: Array[Filter],
partitionFilters: Seq[Expression] = Seq.empty,
dataFilters: Seq[Expression] = Seq.empty)
extends FileScan {
+ /**
+ * Returns the complete read schema including data columns, partition
columns, and any requested
+ * metadata columns. Metadata columns are appended last so the reader
factory can construct a
+ * [[JoinedRow]] that appends metadata values after data and partition
values.
+ */
+ override def readSchema(): StructType =
+ StructType(readDataSchema.fields ++ readPartitionSchema.fields ++
metadataSchema.fields)
+
override def createReaderFactory(): PartitionReaderFactory = {
val caseSensitiveMap = options.asScala.toMap
val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
@@ -61,6 +71,7 @@ case class ShapefileScan(
dataSchema,
readDataSchema,
readPartitionSchema,
+ metadataSchema,
ShapefileReadOptions.parse(options),
pushedFilters)
}
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
index e5135e381d..48b5e45d53 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
@@ -33,6 +33,28 @@ case class ShapefileScanBuilder(
options: CaseInsensitiveStringMap)
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+ /**
+ * Tracks any metadata fields (e.g., from `_metadata`) requested in the
query. Populated by
+ * [[pruneColumns]] when Spark pushes down column projections.
+ */
+ private var _requiredMetadataSchema: StructType = StructType(Seq.empty)
+
+ /**
+ * Intercepts Spark's column pruning to separate metadata columns from
data/partition columns.
+ * Fields in [[requiredSchema]] that do not belong to the data schema or
partition schema are
+ * assumed to be metadata fields (e.g., `_metadata`). These are captured in
+ * [[_requiredMetadataSchema]] so the scan can include them in the output.
+ */
+ override def pruneColumns(requiredSchema: StructType): Unit = {
+ val resolver = sparkSession.sessionState.conf.resolver
+ val metaFields = requiredSchema.fields.filter { field =>
+ !dataSchema.fields.exists(df => resolver(df.name, field.name)) &&
+ !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name,
field.name))
+ }
+ _requiredMetadataSchema = StructType(metaFields)
+ super.pruneColumns(requiredSchema)
+ }
+
override def build(): Scan = {
ShapefileScan(
sparkSession,
@@ -40,6 +62,7 @@ case class ShapefileScanBuilder(
dataSchema,
readDataSchema(),
readPartitionSchema(),
+ _requiredMetadataSchema,
options,
pushedDataFilters,
partitionFilters,
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 7db6bb8d1f..1f4bd093b9 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -21,19 +21,27 @@ package org.apache.sedona.sql.datasources.shapefile
import org.apache.hadoop.fs.FileStatus
import
org.apache.sedona.core.formatMapper.shapefileParser.parseUtils.dbf.DbfParseUtil
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.connector.catalog.TableCapability
+import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema,
fieldDescriptorsToSchema, mergeSchemas}
import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, LongType, StringType,
StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration
import java.util.Locale
import scala.collection.JavaConverters._
+/**
+ * A Spark DataSource V2 table implementation for reading Shapefiles.
+ *
+ * Extends [[FileTable]] to leverage Spark's file-based scan infrastructure
and implements
+ * [[SupportsMetadataColumns]] to expose hidden metadata columns (e.g.,
`_metadata`) that provide
+ * file-level information such as path, name, size, and modification time.
These metadata columns
+ * are not part of the user-visible schema but can be explicitly selected in
queries.
+ */
case class ShapefileTable(
name: String,
sparkSession: SparkSession,
@@ -41,7 +49,8 @@ case class ShapefileTable(
paths: Seq[String],
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat])
- extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+ extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
+ with SupportsMetadataColumns {
override def formatName: String = "Shapefile"
@@ -95,9 +104,49 @@ case class ShapefileTable(
}
}
+ /** Returns the metadata columns that this table exposes as hidden columns.
*/
+ override def metadataColumns(): Array[MetadataColumn] =
ShapefileTable.fileMetadataColumns
+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
ShapefileScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
}
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = null
}
+
+object ShapefileTable {
+
+ /**
+ * Schema of the `_metadata` struct column exposed by
[[SupportsMetadataColumns]]. Each field
+ * provides file-level information about the source shapefile:
+ *
+ * - `file_path`: The fully qualified path of the `.shp` file (e.g.,
+ * `hdfs://host/data/file.shp`).
+ * - `file_name`: The name of the `.shp` file without directory components
(e.g., `file.shp`).
+ * - `file_size`: The total size of the `.shp` file in bytes.
+ * - `file_block_start`: The byte offset within the file where this
partition's data begins.
+ * For non-splittable formats this is typically 0.
+ * - `file_block_length`: The number of bytes in this partition's data
block. For
+ * non-splittable formats this equals the file size.
+ * - `file_modification_time`: The last modification timestamp of the
`.shp` file.
+ */
+ private val FILE_METADATA_STRUCT_TYPE: StructType = StructType(
+ Seq(
+ StructField("file_path", StringType, nullable = false),
+ StructField("file_name", StringType, nullable = false),
+ StructField("file_size", LongType, nullable = false),
+ StructField("file_block_start", LongType, nullable = false),
+ StructField("file_block_length", LongType, nullable = false),
+ StructField("file_modification_time", TimestampType, nullable = false)))
+
+ /**
+ * The single metadata column `_metadata` exposed to Spark's catalog. This
hidden column can be
+ * selected in queries (e.g., `SELECT _metadata.file_name FROM
shapefile.`...``) but does not
+ * appear in `SELECT *`.
+ */
+ private[shapefile] val fileMetadataColumns: Array[MetadataColumn] =
Array(new MetadataColumn {
+ override def name: String = "_metadata"
+ override def dataType: DataType = FILE_METADATA_STRUCT_TYPE
+ override def isNullable: Boolean = false
+ })
+}
diff --git
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
index 275bc3282f..4b4f218b36 100644
--- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
+++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
@@ -20,7 +20,7 @@ package org.apache.sedona.sql
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
-import org.apache.spark.sql.types.{DateType, DecimalType, LongType,
StringType, StructField, StructType}
+import org.apache.spark.sql.types.{DateType, DecimalType, LongType,
StringType, StructField, StructType, TimestampType}
import org.locationtech.jts.geom.{Geometry, MultiPolygon, Point, Polygon}
import org.locationtech.jts.io.{WKTReader, WKTWriter}
import org.scalatest.BeforeAndAfterAll
@@ -502,7 +502,7 @@ class ShapefileTests extends TestBaseScala with
BeforeAndAfterAll {
.format("shapefile")
.load(temporaryLocation)
.select("part", "id", "aInt", "aUnicode", "geometry")
- var rows = shapefileDf.collect()
+ val rows = shapefileDf.collect()
assert(rows.length == 9)
rows.foreach { row =>
assert(row.getAs[Geometry]("geometry").isInstanceOf[Point])
@@ -519,9 +519,9 @@ class ShapefileTests extends TestBaseScala with
BeforeAndAfterAll {
}
// Using partition filters
- rows = shapefileDf.where("part = 2").collect()
- assert(rows.length == 4)
- rows.foreach { row =>
+ val filteredRows = shapefileDf.where("part = 2").collect()
+ assert(filteredRows.length == 4)
+ filteredRows.foreach { row =>
assert(row.getAs[Geometry]("geometry").isInstanceOf[Point])
assert(row.getAs[Int]("part") == 2)
val id = row.getAs[Long]("id")
@@ -780,5 +780,180 @@ class ShapefileTests extends TestBaseScala with
BeforeAndAfterAll {
}
}
}
+
+ it("should expose _metadata struct with all expected fields") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val metaDf = df.select("_metadata")
+ val metaSchema =
metaDf.schema("_metadata").dataType.asInstanceOf[StructType]
+ val expectedFields =
+ Seq(
+ "file_path",
+ "file_name",
+ "file_size",
+ "file_block_start",
+ "file_block_length",
+ "file_modification_time")
+ assert(metaSchema.fieldNames.toSeq == expectedFields)
+ assert(metaSchema("file_path").dataType == StringType)
+ assert(metaSchema("file_name").dataType == StringType)
+ assert(metaSchema("file_size").dataType == LongType)
+ assert(metaSchema("file_block_start").dataType == LongType)
+ assert(metaSchema("file_block_length").dataType == LongType)
+ assert(metaSchema("file_modification_time").dataType == TimestampType)
+ }
+
+ it("should not include _metadata in select(*)") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val columns = df.columns
+ assert(!columns.contains("_metadata"))
+ }
+
+ it("should return correct file_path and file_name in _metadata") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val metaRows = df.select("_metadata.file_path",
"_metadata.file_name").distinct().collect()
+ assert(metaRows.length == 1)
+ val filePath = metaRows.head.getString(0)
+ val fileName = metaRows.head.getString(1)
+ assert(filePath.endsWith("gis_osm_pois_free_1.shp"))
+ assert(fileName == "gis_osm_pois_free_1.shp")
+ }
+
+ it("should return actual file_size matching the .shp file on disk") {
+ val shpFile =
+ new File(resourceFolder +
"shapefiles/gis_osm_pois_free_1/gis_osm_pois_free_1.shp")
+ val expectedSize = shpFile.length()
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val sizes = df.select("_metadata.file_size").distinct().collect()
+ assert(sizes.length == 1)
+ assert(sizes.head.getLong(0) == expectedSize)
+ }
+
+ it(
+ "should return file_block_start=0 and file_block_length=file_size for
non-splittable shapefiles") {
+ val shpFile =
+ new File(resourceFolder +
"shapefiles/gis_osm_pois_free_1/gis_osm_pois_free_1.shp")
+ val expectedSize = shpFile.length()
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val rows = df
+ .select("_metadata.file_block_start", "_metadata.file_block_length")
+ .distinct()
+ .collect()
+ assert(rows.length == 1)
+ assert(rows.head.getLong(0) == 0L) // file_block_start
+ assert(rows.head.getLong(1) == expectedSize) // file_block_length
+ }
+
+ it("should return file_modification_time matching the .shp file on disk") {
+ val shpFile =
+ new File(resourceFolder +
"shapefiles/gis_osm_pois_free_1/gis_osm_pois_free_1.shp")
+ // File.lastModified() returns milliseconds, Spark TimestampType stores
microseconds
+ val expectedModTimeMs = shpFile.lastModified()
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val times =
+ df.select("_metadata.file_modification_time").distinct().collect()
+ assert(times.length == 1)
+ val modTime = times.head.getTimestamp(0)
+ assert(modTime != null)
+ // Timestamp.getTime() returns milliseconds
+ assert(modTime.getTime == expectedModTimeMs)
+ }
+
+ it("should return correct metadata values per file when reading multiple
shapefiles") {
+ val map1Shp =
+ new File(resourceFolder + "shapefiles/multipleshapefiles/map1.shp")
+ val map2Shp =
+ new File(resourceFolder + "shapefiles/multipleshapefiles/map2.shp")
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/multipleshapefiles")
+ val metaRows = df
+ .select(
+ "_metadata.file_name",
+ "_metadata.file_size",
+ "_metadata.file_block_start",
+ "_metadata.file_block_length")
+ .distinct()
+ .collect()
+ assert(metaRows.length == 2)
+ val byName = metaRows.map(r => r.getString(0) -> r).toMap
+ // map1.shp
+ assert(byName("map1.shp").getLong(1) == map1Shp.length())
+ assert(byName("map1.shp").getLong(2) == 0L)
+ assert(byName("map1.shp").getLong(3) == map1Shp.length())
+ // map2.shp
+ assert(byName("map2.shp").getLong(1) == map2Shp.length())
+ assert(byName("map2.shp").getLong(2) == 0L)
+ assert(byName("map2.shp").getLong(3) == map2Shp.length())
+ }
+
+ it("should allow filtering on _metadata fields") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/multipleshapefiles")
+ val totalCount = df.count()
+ val map1Df = df.filter(df("_metadata.file_name") === "map1.shp")
+ val map2Df = df.filter(df("_metadata.file_name") === "map2.shp")
+ assert(map1Df.count() > 0)
+ assert(map2Df.count() > 0)
+ assert(map1Df.count() + map2Df.count() == totalCount)
+ }
+
+ it("should select _metadata along with data columns") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val result = df.select("osm_id", "_metadata.file_name").collect()
+ assert(result.length == 12873)
+ result.foreach { row =>
+ assert(row.getString(0).nonEmpty)
+ assert(row.getString(1) == "gis_osm_pois_free_1.shp")
+ }
+ }
+
+ it("should return correct metadata for each file in multi-shapefile
directory") {
+ val dt1Shp = new File(resourceFolder +
"shapefiles/datatypes/datatypes1.shp")
+ val dt2Shp = new File(resourceFolder +
"shapefiles/datatypes/datatypes2.shp")
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/datatypes")
+ val result = df
+ .select(
+ "_metadata.file_path",
+ "_metadata.file_name",
+ "_metadata.file_size",
+ "_metadata.file_block_start",
+ "_metadata.file_block_length",
+ "_metadata.file_modification_time")
+ .distinct()
+ .collect()
+ assert(result.length == 2)
+ val byName = result.map(r => r.getString(1) -> r).toMap
+ // datatypes1.shp
+ val r1 = byName("datatypes1.shp")
+ assert(r1.getString(0).endsWith("datatypes1.shp"))
+ assert(r1.getLong(2) == dt1Shp.length())
+ assert(r1.getLong(3) == 0L)
+ assert(r1.getLong(4) == dt1Shp.length())
+ assert(r1.getTimestamp(5).getTime == dt1Shp.lastModified())
+ // datatypes2.shp
+ val r2 = byName("datatypes2.shp")
+ assert(r2.getString(0).endsWith("datatypes2.shp"))
+ assert(r2.getLong(2) == dt2Shp.length())
+ assert(r2.getLong(3) == 0L)
+ assert(r2.getLong(4) == dt2Shp.length())
+ assert(r2.getTimestamp(5).getTime == dt2Shp.lastModified())
+ }
}
}
diff --git
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
index 5a28af6d66..79c0638bd6 100644
---
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
+++
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
@@ -18,8 +18,11 @@
*/
package org.apache.sedona.sql.datasources.shapefile
+import org.apache.hadoop.fs.Path
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow}
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.connector.read.PartitionReaderFactory
@@ -28,14 +31,19 @@ import
org.apache.spark.sql.execution.datasources.v2.PartitionReaderWithPartitio
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration
+import java.util.Locale
+
case class ShapefilePartitionReaderFactory(
sqlConf: SQLConf,
broadcastedConf: Broadcast[SerializableConfiguration],
dataSchema: StructType,
readDataSchema: StructType,
partitionSchema: StructType,
+ /** The metadata fields requested by the query (e.g., fields from
`_metadata`). */
+ metadataSchema: StructType,
options: ShapefileReadOptions,
filters: Seq[Filter])
extends PartitionReaderFactory {
@@ -48,11 +56,51 @@ case class ShapefilePartitionReaderFactory(
partitionedFiles,
readDataSchema,
options)
- new PartitionReaderWithPartitionValues(
+ val withPartitionValues = new PartitionReaderWithPartitionValues(
fileReader,
readDataSchema,
partitionSchema,
partitionedFiles.head.partitionValues)
+
+ if (metadataSchema.nonEmpty) {
+ // Build metadata values from the .shp file's partition information.
+ // We use the .shp file because it is the primary shapefile component
and its path
+ // is what users would expect to see in _metadata.file_path /
_metadata.file_name.
+ val shpFile = partitionedFiles
+
.find(_.filePath.toPath.getName.toLowerCase(Locale.ROOT).endsWith(".shp"))
+ .getOrElse(partitionedFiles.head)
+ val filePath = shpFile.filePath.toString
+ val fileName = new Path(filePath).getName
+
+ // Complete map of all metadata field values keyed by field name.
+ // The modificationTime from PartitionedFile is in milliseconds but
Spark's
+ // TimestampType uses microseconds, so we multiply by 1000.
+ val allMetadataValues: Map[String, Any] = Map(
+ "file_path" -> UTF8String.fromString(filePath),
+ "file_name" -> UTF8String.fromString(fileName),
+ "file_size" -> shpFile.fileSize,
+ "file_block_start" -> shpFile.start,
+ "file_block_length" -> shpFile.length,
+ "file_modification_time" -> (shpFile.modificationTime * 1000L))
+
+ // The metadataSchema may be pruned by Spark's column pruning (e.g.,
when the query
+ // only selects `_metadata.file_name`). We must construct the inner
struct to match
+ // the pruned schema exactly, otherwise field ordinals will be
misaligned.
+ val innerStructType =
metadataSchema.fields.head.dataType.asInstanceOf[StructType]
+ val prunedValues = innerStructType.fields.map(f =>
allMetadataValues(f.name))
+ val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq)
+
+ // Wrap the struct in an outer row since _metadata is a single
StructType column
+ val metadataRow = InternalRow.fromSeq(Seq(metadataStruct))
+ val baseSchema = StructType(readDataSchema.fields ++
partitionSchema.fields)
+ new PartitionReaderWithMetadata(
+ withPartitionValues,
+ baseSchema,
+ metadataSchema,
+ metadataRow)
+ } else {
+ withPartitionValues
+ }
}
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
@@ -64,3 +112,43 @@ case class ShapefilePartitionReaderFactory(
}
}
}
+
+/**
+ * Wraps a partition reader to append metadata column values to each row. This
follows the same
+ * pattern as [[PartitionReaderWithPartitionValues]] but for metadata columns:
it uses a
+ * [[JoinedRow]] to concatenate the base row (data + partition values) with
the metadata row, then
+ * projects the combined row through an
+ * [[org.apache.spark.sql.catalyst.expressions.UnsafeProjection]] to produce a
compact unsafe row.
+ *
+ * @param reader
+ * the underlying reader that produces data + partition value rows
+ * @param baseSchema
+ * the combined schema of data columns and partition columns
+ * @param metadataSchema
+ * the schema of the metadata columns being appended
+ * @param metadataValues
+ * the constant metadata values to append to every row
+ */
+private[shapefile] class PartitionReaderWithMetadata(
+ reader: PartitionReader[InternalRow],
+ baseSchema: StructType,
+ metadataSchema: StructType,
+ metadataValues: InternalRow)
+ extends PartitionReader[InternalRow] {
+
+ private val joinedRow = new JoinedRow()
+ private val unsafeProjection =
+ GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map {
case (f, i) =>
+ BoundReference(i, f.dataType, f.nullable)
+ } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) =>
+ BoundReference(baseSchema.length + i, f.dataType, f.nullable)
+ })
+
+ override def next(): Boolean = reader.next()
+
+ override def get(): InternalRow = {
+ unsafeProjection(joinedRow(reader.get(), metadataValues))
+ }
+
+ override def close(): Unit = reader.close()
+}
diff --git
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
index afdface894..3f6a9224aa 100644
---
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
+++
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
@@ -44,12 +44,22 @@ case class ShapefileScan(
dataSchema: StructType,
readDataSchema: StructType,
readPartitionSchema: StructType,
+ /** The metadata fields requested by the query (e.g., fields from
`_metadata`). */
+ metadataSchema: StructType,
options: CaseInsensitiveStringMap,
pushedFilters: Array[Filter],
partitionFilters: Seq[Expression] = Seq.empty,
dataFilters: Seq[Expression] = Seq.empty)
extends FileScan {
+ /**
+ * Returns the complete read schema including data columns, partition
columns, and any requested
+ * metadata columns. Metadata columns are appended last so the reader
factory can construct a
+ * [[JoinedRow]] that appends metadata values after data and partition
values.
+ */
+ override def readSchema(): StructType =
+ StructType(readDataSchema.fields ++ readPartitionSchema.fields ++
metadataSchema.fields)
+
override def createReaderFactory(): PartitionReaderFactory = {
val caseSensitiveMap = options.asScala.toMap
val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
@@ -61,6 +71,7 @@ case class ShapefileScan(
dataSchema,
readDataSchema,
readPartitionSchema,
+ metadataSchema,
ShapefileReadOptions.parse(options),
pushedFilters)
}
diff --git
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
index e5135e381d..48b5e45d53 100644
---
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
+++
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
@@ -33,6 +33,28 @@ case class ShapefileScanBuilder(
options: CaseInsensitiveStringMap)
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+ /**
+ * Tracks any metadata fields (e.g., from `_metadata`) requested in the
query. Populated by
+ * [[pruneColumns]] when Spark pushes down column projections.
+ */
+ private var _requiredMetadataSchema: StructType = StructType(Seq.empty)
+
+ /**
+ * Intercepts Spark's column pruning to separate metadata columns from
data/partition columns.
+ * Fields in [[requiredSchema]] that do not belong to the data schema or
partition schema are
+ * assumed to be metadata fields (e.g., `_metadata`). These are captured in
+ * [[_requiredMetadataSchema]] so the scan can include them in the output.
+ */
+ override def pruneColumns(requiredSchema: StructType): Unit = {
+ val resolver = sparkSession.sessionState.conf.resolver
+ val metaFields = requiredSchema.fields.filter { field =>
+ !dataSchema.fields.exists(df => resolver(df.name, field.name)) &&
+ !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name,
field.name))
+ }
+ _requiredMetadataSchema = StructType(metaFields)
+ super.pruneColumns(requiredSchema)
+ }
+
override def build(): Scan = {
ShapefileScan(
sparkSession,
@@ -40,6 +62,7 @@ case class ShapefileScanBuilder(
dataSchema,
readDataSchema(),
readPartitionSchema(),
+ _requiredMetadataSchema,
options,
pushedDataFilters,
partitionFilters,
diff --git
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 7db6bb8d1f..1f4bd093b9 100644
---
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -21,19 +21,27 @@ package org.apache.sedona.sql.datasources.shapefile
import org.apache.hadoop.fs.FileStatus
import
org.apache.sedona.core.formatMapper.shapefileParser.parseUtils.dbf.DbfParseUtil
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.connector.catalog.TableCapability
+import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema,
fieldDescriptorsToSchema, mergeSchemas}
import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, LongType, StringType,
StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration
import java.util.Locale
import scala.collection.JavaConverters._
+/**
+ * A Spark DataSource V2 table implementation for reading Shapefiles.
+ *
+ * Extends [[FileTable]] to leverage Spark's file-based scan infrastructure
and implements
+ * [[SupportsMetadataColumns]] to expose hidden metadata columns (e.g.,
`_metadata`) that provide
+ * file-level information such as path, name, size, and modification time.
These metadata columns
+ * are not part of the user-visible schema but can be explicitly selected in
queries.
+ */
case class ShapefileTable(
name: String,
sparkSession: SparkSession,
@@ -41,7 +49,8 @@ case class ShapefileTable(
paths: Seq[String],
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat])
- extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+ extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
+ with SupportsMetadataColumns {
override def formatName: String = "Shapefile"
@@ -95,9 +104,49 @@ case class ShapefileTable(
}
}
+ /** Returns the metadata columns that this table exposes as hidden columns.
*/
+ override def metadataColumns(): Array[MetadataColumn] =
ShapefileTable.fileMetadataColumns
+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
ShapefileScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
}
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = null
}
+
+object ShapefileTable {
+
+ /**
+ * Schema of the `_metadata` struct column exposed by
[[SupportsMetadataColumns]]. Each field
+ * provides file-level information about the source shapefile:
+ *
+ * - `file_path`: The fully qualified path of the `.shp` file (e.g.,
+ * `hdfs://host/data/file.shp`).
+ * - `file_name`: The name of the `.shp` file without directory components
(e.g., `file.shp`).
+ * - `file_size`: The total size of the `.shp` file in bytes.
+ * - `file_block_start`: The byte offset within the file where this
partition's data begins.
+ * For non-splittable formats this is typically 0.
+ * - `file_block_length`: The number of bytes in this partition's data
block. For
+ * non-splittable formats this equals the file size.
+ * - `file_modification_time`: The last modification timestamp of the
`.shp` file.
+ */
+ private val FILE_METADATA_STRUCT_TYPE: StructType = StructType(
+ Seq(
+ StructField("file_path", StringType, nullable = false),
+ StructField("file_name", StringType, nullable = false),
+ StructField("file_size", LongType, nullable = false),
+ StructField("file_block_start", LongType, nullable = false),
+ StructField("file_block_length", LongType, nullable = false),
+ StructField("file_modification_time", TimestampType, nullable = false)))
+
+ /**
+ * The single metadata column `_metadata` exposed to Spark's catalog. This
hidden column can be
+ * selected in queries (e.g., `SELECT _metadata.file_name FROM
shapefile.`...``) but does not
+ * appear in `SELECT *`.
+ */
+ private[shapefile] val fileMetadataColumns: Array[MetadataColumn] =
Array(new MetadataColumn {
+ override def name: String = "_metadata"
+ override def dataType: DataType = FILE_METADATA_STRUCT_TYPE
+ override def isNullable: Boolean = false
+ })
+}
diff --git
a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
index 275bc3282f..4b4f218b36 100644
--- a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
+++ b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
@@ -20,7 +20,7 @@ package org.apache.sedona.sql
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
-import org.apache.spark.sql.types.{DateType, DecimalType, LongType,
StringType, StructField, StructType}
+import org.apache.spark.sql.types.{DateType, DecimalType, LongType,
StringType, StructField, StructType, TimestampType}
import org.locationtech.jts.geom.{Geometry, MultiPolygon, Point, Polygon}
import org.locationtech.jts.io.{WKTReader, WKTWriter}
import org.scalatest.BeforeAndAfterAll
@@ -502,7 +502,7 @@ class ShapefileTests extends TestBaseScala with
BeforeAndAfterAll {
.format("shapefile")
.load(temporaryLocation)
.select("part", "id", "aInt", "aUnicode", "geometry")
- var rows = shapefileDf.collect()
+ val rows = shapefileDf.collect()
assert(rows.length == 9)
rows.foreach { row =>
assert(row.getAs[Geometry]("geometry").isInstanceOf[Point])
@@ -519,9 +519,9 @@ class ShapefileTests extends TestBaseScala with
BeforeAndAfterAll {
}
// Using partition filters
- rows = shapefileDf.where("part = 2").collect()
- assert(rows.length == 4)
- rows.foreach { row =>
+ val filteredRows = shapefileDf.where("part = 2").collect()
+ assert(filteredRows.length == 4)
+ filteredRows.foreach { row =>
assert(row.getAs[Geometry]("geometry").isInstanceOf[Point])
assert(row.getAs[Int]("part") == 2)
val id = row.getAs[Long]("id")
@@ -780,5 +780,180 @@ class ShapefileTests extends TestBaseScala with
BeforeAndAfterAll {
}
}
}
+
+ it("should expose _metadata struct with all expected fields") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val metaDf = df.select("_metadata")
+ val metaSchema =
metaDf.schema("_metadata").dataType.asInstanceOf[StructType]
+ val expectedFields =
+ Seq(
+ "file_path",
+ "file_name",
+ "file_size",
+ "file_block_start",
+ "file_block_length",
+ "file_modification_time")
+ assert(metaSchema.fieldNames.toSeq == expectedFields)
+ assert(metaSchema("file_path").dataType == StringType)
+ assert(metaSchema("file_name").dataType == StringType)
+ assert(metaSchema("file_size").dataType == LongType)
+ assert(metaSchema("file_block_start").dataType == LongType)
+ assert(metaSchema("file_block_length").dataType == LongType)
+ assert(metaSchema("file_modification_time").dataType == TimestampType)
+ }
+
+ it("should not include _metadata in select(*)") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val columns = df.columns
+ assert(!columns.contains("_metadata"))
+ }
+
+ it("should return correct file_path and file_name in _metadata") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val metaRows = df.select("_metadata.file_path",
"_metadata.file_name").distinct().collect()
+ assert(metaRows.length == 1)
+ val filePath = metaRows.head.getString(0)
+ val fileName = metaRows.head.getString(1)
+ assert(filePath.endsWith("gis_osm_pois_free_1.shp"))
+ assert(fileName == "gis_osm_pois_free_1.shp")
+ }
+
+ it("should return actual file_size matching the .shp file on disk") {
+ val shpFile =
+ new File(resourceFolder +
"shapefiles/gis_osm_pois_free_1/gis_osm_pois_free_1.shp")
+ val expectedSize = shpFile.length()
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val sizes = df.select("_metadata.file_size").distinct().collect()
+ assert(sizes.length == 1)
+ assert(sizes.head.getLong(0) == expectedSize)
+ }
+
+ it(
+ "should return file_block_start=0 and file_block_length=file_size for
non-splittable shapefiles") {
+ val shpFile =
+ new File(resourceFolder +
"shapefiles/gis_osm_pois_free_1/gis_osm_pois_free_1.shp")
+ val expectedSize = shpFile.length()
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val rows = df
+ .select("_metadata.file_block_start", "_metadata.file_block_length")
+ .distinct()
+ .collect()
+ assert(rows.length == 1)
+ assert(rows.head.getLong(0) == 0L) // file_block_start
+ assert(rows.head.getLong(1) == expectedSize) // file_block_length
+ }
+
+ it("should return file_modification_time matching the .shp file on disk") {
+ val shpFile =
+ new File(resourceFolder +
"shapefiles/gis_osm_pois_free_1/gis_osm_pois_free_1.shp")
+ // File.lastModified() returns milliseconds, Spark TimestampType stores
microseconds
+ val expectedModTimeMs = shpFile.lastModified()
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val times =
+ df.select("_metadata.file_modification_time").distinct().collect()
+ assert(times.length == 1)
+ val modTime = times.head.getTimestamp(0)
+ assert(modTime != null)
+ // Timestamp.getTime() returns milliseconds
+ assert(modTime.getTime == expectedModTimeMs)
+ }
+
+ it("should return correct metadata values per file when reading multiple
shapefiles") {
+ val map1Shp =
+ new File(resourceFolder + "shapefiles/multipleshapefiles/map1.shp")
+ val map2Shp =
+ new File(resourceFolder + "shapefiles/multipleshapefiles/map2.shp")
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/multipleshapefiles")
+ val metaRows = df
+ .select(
+ "_metadata.file_name",
+ "_metadata.file_size",
+ "_metadata.file_block_start",
+ "_metadata.file_block_length")
+ .distinct()
+ .collect()
+ assert(metaRows.length == 2)
+ val byName = metaRows.map(r => r.getString(0) -> r).toMap
+ // map1.shp
+ assert(byName("map1.shp").getLong(1) == map1Shp.length())
+ assert(byName("map1.shp").getLong(2) == 0L)
+ assert(byName("map1.shp").getLong(3) == map1Shp.length())
+ // map2.shp
+ assert(byName("map2.shp").getLong(1) == map2Shp.length())
+ assert(byName("map2.shp").getLong(2) == 0L)
+ assert(byName("map2.shp").getLong(3) == map2Shp.length())
+ }
+
+ it("should allow filtering on _metadata fields") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/multipleshapefiles")
+ val totalCount = df.count()
+ val map1Df = df.filter(df("_metadata.file_name") === "map1.shp")
+ val map2Df = df.filter(df("_metadata.file_name") === "map2.shp")
+ assert(map1Df.count() > 0)
+ assert(map2Df.count() > 0)
+ assert(map1Df.count() + map2Df.count() == totalCount)
+ }
+
+ it("should select _metadata along with data columns") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val result = df.select("osm_id", "_metadata.file_name").collect()
+ assert(result.length == 12873)
+ result.foreach { row =>
+ assert(row.getString(0).nonEmpty)
+ assert(row.getString(1) == "gis_osm_pois_free_1.shp")
+ }
+ }
+
+ it("should return correct metadata for each file in multi-shapefile
directory") {
+ val dt1Shp = new File(resourceFolder +
"shapefiles/datatypes/datatypes1.shp")
+ val dt2Shp = new File(resourceFolder +
"shapefiles/datatypes/datatypes2.shp")
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/datatypes")
+ val result = df
+ .select(
+ "_metadata.file_path",
+ "_metadata.file_name",
+ "_metadata.file_size",
+ "_metadata.file_block_start",
+ "_metadata.file_block_length",
+ "_metadata.file_modification_time")
+ .distinct()
+ .collect()
+ assert(result.length == 2)
+ val byName = result.map(r => r.getString(1) -> r).toMap
+ // datatypes1.shp
+ val r1 = byName("datatypes1.shp")
+ assert(r1.getString(0).endsWith("datatypes1.shp"))
+ assert(r1.getLong(2) == dt1Shp.length())
+ assert(r1.getLong(3) == 0L)
+ assert(r1.getLong(4) == dt1Shp.length())
+ assert(r1.getTimestamp(5).getTime == dt1Shp.lastModified())
+ // datatypes2.shp
+ val r2 = byName("datatypes2.shp")
+ assert(r2.getString(0).endsWith("datatypes2.shp"))
+ assert(r2.getLong(2) == dt2Shp.length())
+ assert(r2.getLong(3) == 0L)
+ assert(r2.getLong(4) == dt2Shp.length())
+ assert(r2.getTimestamp(5).getTime == dt2Shp.lastModified())
+ }
}
}
diff --git
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
index 5a28af6d66..79c0638bd6 100644
---
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
+++
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefilePartitionReaderFactory.scala
@@ -18,8 +18,11 @@
*/
package org.apache.sedona.sql.datasources.shapefile
+import org.apache.hadoop.fs.Path
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow}
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.connector.read.PartitionReaderFactory
@@ -28,14 +31,19 @@ import
org.apache.spark.sql.execution.datasources.v2.PartitionReaderWithPartitio
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration
+import java.util.Locale
+
case class ShapefilePartitionReaderFactory(
sqlConf: SQLConf,
broadcastedConf: Broadcast[SerializableConfiguration],
dataSchema: StructType,
readDataSchema: StructType,
partitionSchema: StructType,
+ /** The metadata fields requested by the query (e.g., fields from
`_metadata`). */
+ metadataSchema: StructType,
options: ShapefileReadOptions,
filters: Seq[Filter])
extends PartitionReaderFactory {
@@ -48,11 +56,51 @@ case class ShapefilePartitionReaderFactory(
partitionedFiles,
readDataSchema,
options)
- new PartitionReaderWithPartitionValues(
+ val withPartitionValues = new PartitionReaderWithPartitionValues(
fileReader,
readDataSchema,
partitionSchema,
partitionedFiles.head.partitionValues)
+
+ if (metadataSchema.nonEmpty) {
+ // Build metadata values from the .shp file's partition information.
+ // We use the .shp file because it is the primary shapefile component
and its path
+ // is what users would expect to see in _metadata.file_path /
_metadata.file_name.
+ val shpFile = partitionedFiles
+
.find(_.filePath.toPath.getName.toLowerCase(Locale.ROOT).endsWith(".shp"))
+ .getOrElse(partitionedFiles.head)
+ val filePath = shpFile.filePath.toString
+ val fileName = new Path(filePath).getName
+
+ // Complete map of all metadata field values keyed by field name.
+ // The modificationTime from PartitionedFile is in milliseconds but
Spark's
+ // TimestampType uses microseconds, so we multiply by 1000.
+ val allMetadataValues: Map[String, Any] = Map(
+ "file_path" -> UTF8String.fromString(filePath),
+ "file_name" -> UTF8String.fromString(fileName),
+ "file_size" -> shpFile.fileSize,
+ "file_block_start" -> shpFile.start,
+ "file_block_length" -> shpFile.length,
+ "file_modification_time" -> (shpFile.modificationTime * 1000L))
+
+ // The metadataSchema may be pruned by Spark's column pruning (e.g.,
when the query
+ // only selects `_metadata.file_name`). We must construct the inner
struct to match
+ // the pruned schema exactly, otherwise field ordinals will be
misaligned.
+ val innerStructType =
metadataSchema.fields.head.dataType.asInstanceOf[StructType]
+ val prunedValues = innerStructType.fields.map(f =>
allMetadataValues(f.name))
+ val metadataStruct = InternalRow.fromSeq(prunedValues.toSeq)
+
+ // Wrap the struct in an outer row since _metadata is a single
StructType column
+ val metadataRow = InternalRow.fromSeq(Seq(metadataStruct))
+ val baseSchema = StructType(readDataSchema.fields ++
partitionSchema.fields)
+ new PartitionReaderWithMetadata(
+ withPartitionValues,
+ baseSchema,
+ metadataSchema,
+ metadataRow)
+ } else {
+ withPartitionValues
+ }
}
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
@@ -64,3 +112,43 @@ case class ShapefilePartitionReaderFactory(
}
}
}
+
+/**
+ * Wraps a partition reader to append metadata column values to each row. This
follows the same
+ * pattern as [[PartitionReaderWithPartitionValues]] but for metadata columns:
it uses a
+ * [[JoinedRow]] to concatenate the base row (data + partition values) with
the metadata row, then
+ * projects the combined row through an
+ * [[org.apache.spark.sql.catalyst.expressions.UnsafeProjection]] to produce a
compact unsafe row.
+ *
+ * @param reader
+ * the underlying reader that produces data + partition value rows
+ * @param baseSchema
+ * the combined schema of data columns and partition columns
+ * @param metadataSchema
+ * the schema of the metadata columns being appended
+ * @param metadataValues
+ * the constant metadata values to append to every row
+ */
+private[shapefile] class PartitionReaderWithMetadata(
+ reader: PartitionReader[InternalRow],
+ baseSchema: StructType,
+ metadataSchema: StructType,
+ metadataValues: InternalRow)
+ extends PartitionReader[InternalRow] {
+
+ private val joinedRow = new JoinedRow()
+ private val unsafeProjection =
+ GenerateUnsafeProjection.generate(baseSchema.fields.zipWithIndex.map {
case (f, i) =>
+ BoundReference(i, f.dataType, f.nullable)
+ } ++ metadataSchema.fields.zipWithIndex.map { case (f, i) =>
+ BoundReference(baseSchema.length + i, f.dataType, f.nullable)
+ })
+
+ override def next(): Boolean = reader.next()
+
+ override def get(): InternalRow = {
+ unsafeProjection(joinedRow(reader.get(), metadataValues))
+ }
+
+ override def close(): Unit = reader.close()
+}
diff --git
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
index afdface894..3f6a9224aa 100644
---
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
+++
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScan.scala
@@ -44,12 +44,22 @@ case class ShapefileScan(
dataSchema: StructType,
readDataSchema: StructType,
readPartitionSchema: StructType,
+ /** The metadata fields requested by the query (e.g., fields from
`_metadata`). */
+ metadataSchema: StructType,
options: CaseInsensitiveStringMap,
pushedFilters: Array[Filter],
partitionFilters: Seq[Expression] = Seq.empty,
dataFilters: Seq[Expression] = Seq.empty)
extends FileScan {
+ /**
+ * Returns the complete read schema including data columns, partition
columns, and any requested
+ * metadata columns. Metadata columns are appended last so the reader
factory can construct a
+ * [[JoinedRow]] that appends metadata values after data and partition
values.
+ */
+ override def readSchema(): StructType =
+ StructType(readDataSchema.fields ++ readPartitionSchema.fields ++
metadataSchema.fields)
+
override def createReaderFactory(): PartitionReaderFactory = {
val caseSensitiveMap = options.asScala.toMap
val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
@@ -61,6 +71,7 @@ case class ShapefileScan(
dataSchema,
readDataSchema,
readPartitionSchema,
+ metadataSchema,
ShapefileReadOptions.parse(options),
pushedFilters)
}
diff --git
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
index e5135e381d..48b5e45d53 100644
---
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
+++
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileScanBuilder.scala
@@ -33,6 +33,28 @@ case class ShapefileScanBuilder(
options: CaseInsensitiveStringMap)
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+ /**
+ * Tracks any metadata fields (e.g., from `_metadata`) requested in the
query. Populated by
+ * [[pruneColumns]] when Spark pushes down column projections.
+ */
+ private var _requiredMetadataSchema: StructType = StructType(Seq.empty)
+
+ /**
+ * Intercepts Spark's column pruning to separate metadata columns from
data/partition columns.
+ * Fields in [[requiredSchema]] that do not belong to the data schema or
partition schema are
+ * assumed to be metadata fields (e.g., `_metadata`). These are captured in
+ * [[_requiredMetadataSchema]] so the scan can include them in the output.
+ */
+ override def pruneColumns(requiredSchema: StructType): Unit = {
+ val resolver = sparkSession.sessionState.conf.resolver
+ val metaFields = requiredSchema.fields.filter { field =>
+ !dataSchema.fields.exists(df => resolver(df.name, field.name)) &&
+ !fileIndex.partitionSchema.fields.exists(pf => resolver(pf.name,
field.name))
+ }
+ _requiredMetadataSchema = StructType(metaFields)
+ super.pruneColumns(requiredSchema)
+ }
+
override def build(): Scan = {
ShapefileScan(
sparkSession,
@@ -40,6 +62,7 @@ case class ShapefileScanBuilder(
dataSchema,
readDataSchema(),
readPartitionSchema(),
+ _requiredMetadataSchema,
options,
pushedDataFilters,
partitionFilters,
diff --git
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 7db6bb8d1f..1f4bd093b9 100644
---
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -21,19 +21,27 @@ package org.apache.sedona.sql.datasources.shapefile
import org.apache.hadoop.fs.FileStatus
import
org.apache.sedona.core.formatMapper.shapefileParser.parseUtils.dbf.DbfParseUtil
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.connector.catalog.TableCapability
+import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema,
fieldDescriptorsToSchema, mergeSchemas}
import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, LongType, StringType,
StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration
import java.util.Locale
import scala.collection.JavaConverters._
+/**
+ * A Spark DataSource V2 table implementation for reading Shapefiles.
+ *
+ * Extends [[FileTable]] to leverage Spark's file-based scan infrastructure
and implements
+ * [[SupportsMetadataColumns]] to expose hidden metadata columns (e.g.,
`_metadata`) that provide
+ * file-level information such as path, name, size, and modification time.
These metadata columns
+ * are not part of the user-visible schema but can be explicitly selected in
queries.
+ */
case class ShapefileTable(
name: String,
sparkSession: SparkSession,
@@ -41,7 +49,8 @@ case class ShapefileTable(
paths: Seq[String],
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat])
- extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+ extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
+ with SupportsMetadataColumns {
override def formatName: String = "Shapefile"
@@ -95,9 +104,49 @@ case class ShapefileTable(
}
}
+ /** Returns the metadata columns that this table exposes as hidden columns.
*/
+ override def metadataColumns(): Array[MetadataColumn] =
ShapefileTable.fileMetadataColumns
+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
ShapefileScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
}
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = null
}
+
+object ShapefileTable {
+
+ /**
+ * Schema of the `_metadata` struct column exposed by
[[SupportsMetadataColumns]]. Each field
+ * provides file-level information about the source shapefile:
+ *
+ * - `file_path`: The fully qualified path of the `.shp` file (e.g.,
+ * `hdfs://host/data/file.shp`).
+ * - `file_name`: The name of the `.shp` file without directory components
(e.g., `file.shp`).
+ * - `file_size`: The total size of the `.shp` file in bytes.
+ * - `file_block_start`: The byte offset within the file where this
partition's data begins.
+ * For non-splittable formats this is typically 0.
+ * - `file_block_length`: The number of bytes in this partition's data
block. For
+ * non-splittable formats this equals the file size.
+ * - `file_modification_time`: The last modification timestamp of the
`.shp` file.
+ */
+ private val FILE_METADATA_STRUCT_TYPE: StructType = StructType(
+ Seq(
+ StructField("file_path", StringType, nullable = false),
+ StructField("file_name", StringType, nullable = false),
+ StructField("file_size", LongType, nullable = false),
+ StructField("file_block_start", LongType, nullable = false),
+ StructField("file_block_length", LongType, nullable = false),
+ StructField("file_modification_time", TimestampType, nullable = false)))
+
+ /**
+ * The single metadata column `_metadata` exposed to Spark's catalog. This
hidden column can be
+ * selected in queries (e.g., `SELECT _metadata.file_name FROM
shapefile.`...``) but does not
+ * appear in `SELECT *`.
+ */
+ private[shapefile] val fileMetadataColumns: Array[MetadataColumn] =
Array(new MetadataColumn {
+ override def name: String = "_metadata"
+ override def dataType: DataType = FILE_METADATA_STRUCT_TYPE
+ override def isNullable: Boolean = false
+ })
+}
diff --git
a/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
b/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
index 275bc3282f..4b4f218b36 100644
--- a/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
+++ b/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
@@ -20,7 +20,7 @@ package org.apache.sedona.sql
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
-import org.apache.spark.sql.types.{DateType, DecimalType, LongType,
StringType, StructField, StructType}
+import org.apache.spark.sql.types.{DateType, DecimalType, LongType,
StringType, StructField, StructType, TimestampType}
import org.locationtech.jts.geom.{Geometry, MultiPolygon, Point, Polygon}
import org.locationtech.jts.io.{WKTReader, WKTWriter}
import org.scalatest.BeforeAndAfterAll
@@ -502,7 +502,7 @@ class ShapefileTests extends TestBaseScala with
BeforeAndAfterAll {
.format("shapefile")
.load(temporaryLocation)
.select("part", "id", "aInt", "aUnicode", "geometry")
- var rows = shapefileDf.collect()
+ val rows = shapefileDf.collect()
assert(rows.length == 9)
rows.foreach { row =>
assert(row.getAs[Geometry]("geometry").isInstanceOf[Point])
@@ -519,9 +519,9 @@ class ShapefileTests extends TestBaseScala with
BeforeAndAfterAll {
}
// Using partition filters
- rows = shapefileDf.where("part = 2").collect()
- assert(rows.length == 4)
- rows.foreach { row =>
+ val filteredRows = shapefileDf.where("part = 2").collect()
+ assert(filteredRows.length == 4)
+ filteredRows.foreach { row =>
assert(row.getAs[Geometry]("geometry").isInstanceOf[Point])
assert(row.getAs[Int]("part") == 2)
val id = row.getAs[Long]("id")
@@ -780,5 +780,180 @@ class ShapefileTests extends TestBaseScala with
BeforeAndAfterAll {
}
}
}
+
+ it("should expose _metadata struct with all expected fields") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val metaDf = df.select("_metadata")
+ val metaSchema =
metaDf.schema("_metadata").dataType.asInstanceOf[StructType]
+ val expectedFields =
+ Seq(
+ "file_path",
+ "file_name",
+ "file_size",
+ "file_block_start",
+ "file_block_length",
+ "file_modification_time")
+ assert(metaSchema.fieldNames.toSeq == expectedFields)
+ assert(metaSchema("file_path").dataType == StringType)
+ assert(metaSchema("file_name").dataType == StringType)
+ assert(metaSchema("file_size").dataType == LongType)
+ assert(metaSchema("file_block_start").dataType == LongType)
+ assert(metaSchema("file_block_length").dataType == LongType)
+ assert(metaSchema("file_modification_time").dataType == TimestampType)
+ }
+
+ it("should not include _metadata in select(*)") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val columns = df.columns
+ assert(!columns.contains("_metadata"))
+ }
+
+ it("should return correct file_path and file_name in _metadata") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val metaRows = df.select("_metadata.file_path",
"_metadata.file_name").distinct().collect()
+ assert(metaRows.length == 1)
+ val filePath = metaRows.head.getString(0)
+ val fileName = metaRows.head.getString(1)
+ assert(filePath.endsWith("gis_osm_pois_free_1.shp"))
+ assert(fileName == "gis_osm_pois_free_1.shp")
+ }
+
+ it("should return actual file_size matching the .shp file on disk") {
+ val shpFile =
+ new File(resourceFolder +
"shapefiles/gis_osm_pois_free_1/gis_osm_pois_free_1.shp")
+ val expectedSize = shpFile.length()
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val sizes = df.select("_metadata.file_size").distinct().collect()
+ assert(sizes.length == 1)
+ assert(sizes.head.getLong(0) == expectedSize)
+ }
+
+ it(
+ "should return file_block_start=0 and file_block_length=file_size for
non-splittable shapefiles") {
+ val shpFile =
+ new File(resourceFolder +
"shapefiles/gis_osm_pois_free_1/gis_osm_pois_free_1.shp")
+ val expectedSize = shpFile.length()
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val rows = df
+ .select("_metadata.file_block_start", "_metadata.file_block_length")
+ .distinct()
+ .collect()
+ assert(rows.length == 1)
+ assert(rows.head.getLong(0) == 0L) // file_block_start
+ assert(rows.head.getLong(1) == expectedSize) // file_block_length
+ }
+
+ it("should return file_modification_time matching the .shp file on disk") {
+ val shpFile =
+ new File(resourceFolder +
"shapefiles/gis_osm_pois_free_1/gis_osm_pois_free_1.shp")
+ // File.lastModified() returns milliseconds, Spark TimestampType stores
microseconds
+ val expectedModTimeMs = shpFile.lastModified()
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val times =
+ df.select("_metadata.file_modification_time").distinct().collect()
+ assert(times.length == 1)
+ val modTime = times.head.getTimestamp(0)
+ assert(modTime != null)
+ // Timestamp.getTime() returns milliseconds
+ assert(modTime.getTime == expectedModTimeMs)
+ }
+
+ it("should return correct metadata values per file when reading multiple
shapefiles") {
+ val map1Shp =
+ new File(resourceFolder + "shapefiles/multipleshapefiles/map1.shp")
+ val map2Shp =
+ new File(resourceFolder + "shapefiles/multipleshapefiles/map2.shp")
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/multipleshapefiles")
+ val metaRows = df
+ .select(
+ "_metadata.file_name",
+ "_metadata.file_size",
+ "_metadata.file_block_start",
+ "_metadata.file_block_length")
+ .distinct()
+ .collect()
+ assert(metaRows.length == 2)
+ val byName = metaRows.map(r => r.getString(0) -> r).toMap
+ // map1.shp
+ assert(byName("map1.shp").getLong(1) == map1Shp.length())
+ assert(byName("map1.shp").getLong(2) == 0L)
+ assert(byName("map1.shp").getLong(3) == map1Shp.length())
+ // map2.shp
+ assert(byName("map2.shp").getLong(1) == map2Shp.length())
+ assert(byName("map2.shp").getLong(2) == 0L)
+ assert(byName("map2.shp").getLong(3) == map2Shp.length())
+ }
+
+ it("should allow filtering on _metadata fields") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/multipleshapefiles")
+ val totalCount = df.count()
+ val map1Df = df.filter(df("_metadata.file_name") === "map1.shp")
+ val map2Df = df.filter(df("_metadata.file_name") === "map2.shp")
+ assert(map1Df.count() > 0)
+ assert(map2Df.count() > 0)
+ assert(map1Df.count() + map2Df.count() == totalCount)
+ }
+
+ it("should select _metadata along with data columns") {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/gis_osm_pois_free_1")
+ val result = df.select("osm_id", "_metadata.file_name").collect()
+ assert(result.length == 12873)
+ result.foreach { row =>
+ assert(row.getString(0).nonEmpty)
+ assert(row.getString(1) == "gis_osm_pois_free_1.shp")
+ }
+ }
+
+ it("should return correct metadata for each file in multi-shapefile
directory") {
+ val dt1Shp = new File(resourceFolder +
"shapefiles/datatypes/datatypes1.shp")
+ val dt2Shp = new File(resourceFolder +
"shapefiles/datatypes/datatypes2.shp")
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/datatypes")
+ val result = df
+ .select(
+ "_metadata.file_path",
+ "_metadata.file_name",
+ "_metadata.file_size",
+ "_metadata.file_block_start",
+ "_metadata.file_block_length",
+ "_metadata.file_modification_time")
+ .distinct()
+ .collect()
+ assert(result.length == 2)
+ val byName = result.map(r => r.getString(1) -> r).toMap
+ // datatypes1.shp
+ val r1 = byName("datatypes1.shp")
+ assert(r1.getString(0).endsWith("datatypes1.shp"))
+ assert(r1.getLong(2) == dt1Shp.length())
+ assert(r1.getLong(3) == 0L)
+ assert(r1.getLong(4) == dt1Shp.length())
+ assert(r1.getTimestamp(5).getTime == dt1Shp.lastModified())
+ // datatypes2.shp
+ val r2 = byName("datatypes2.shp")
+ assert(r2.getString(0).endsWith("datatypes2.shp"))
+ assert(r2.getLong(2) == dt2Shp.length())
+ assert(r2.getLong(3) == 0L)
+ assert(r2.getLong(4) == dt2Shp.length())
+ assert(r2.getTimestamp(5).getTime == dt2Shp.lastModified())
+ }
}
}