This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 1e6303e703 [GH-2650] Fix warning message when reading shapefiles from
S3 (#2655)
1e6303e703 is described below
commit 1e6303e70320549ffb013e28914f27d9bd6fdd0c
Author: Jia Yu <[email protected]>
AuthorDate: Sun Feb 15 02:06:06 2026 -0700
[GH-2650] Fix warning message when reading shapefiles from S3 (#2655)
---
.../datasources/SedonaFileIndexHelper.scala | 66 ++++++++++++++++++++++
.../datasources/geopackage/GeoPackageTable.scala | 9 ++-
.../sql/datasources/shapefile/ShapefileTable.scala | 9 ++-
.../metadata/GeoParquetMetadataTable.scala | 10 +++-
.../org/apache/sedona/sql/ShapefileTests.scala | 38 +++++++++++++
.../datasources/geopackage/GeoPackageTable.scala | 9 ++-
.../sql/datasources/shapefile/ShapefileTable.scala | 9 ++-
.../metadata/GeoParquetMetadataTable.scala | 10 +++-
.../org/apache/sedona/sql/ShapefileTests.scala | 38 +++++++++++++
.../datasources/geopackage/GeoPackageTable.scala | 9 ++-
.../sql/datasources/shapefile/ShapefileTable.scala | 9 ++-
.../metadata/GeoParquetMetadataTable.scala | 10 +++-
.../org/apache/sedona/sql/ShapefileTests.scala | 38 +++++++++++++
.../datasources/geopackage/GeoPackageTable.scala | 9 ++-
.../sql/datasources/shapefile/ShapefileTable.scala | 9 ++-
.../metadata/GeoParquetMetadataTable.scala | 10 +++-
.../org/apache/sedona/sql/ShapefileTests.scala | 38 +++++++++++++
17 files changed, 318 insertions(+), 12 deletions(-)
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/SedonaFileIndexHelper.scala
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/SedonaFileIndexHelper.scala
new file mode 100644
index 0000000000..7971a1b4ba
--- /dev/null
+++
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/SedonaFileIndexHelper.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+
+/**
+ * Helper for creating a [[PartitioningAwareFileIndex]] without going through
the
+ * [[org.apache.spark.sql.execution.streaming.FileStreamSink.hasMetadata]]
check in
+ * [[org.apache.spark.sql.execution.datasources.v2.FileTable.fileIndex]].
+ *
+ * <p>The streaming metadata check can produce spurious
[[java.io.FileNotFoundException]] warnings
+ * when reading from cloud storage (e.g., S3) because it attempts to stat the
path as a directory.
+ * For non-streaming, read-only file tables such as Shapefile and GeoPackage,
this check is
+ * unnecessary and can be safely bypassed.
+ */
+object SedonaFileIndexHelper {
+
+ /**
+ * Build an [[InMemoryFileIndex]] for the given paths, resolving globs if
necessary, without the
+ * streaming metadata directory check.
+ */
+ def createFileIndex(
+ sparkSession: SparkSession,
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType]): PartitioningAwareFileIndex = {
+ val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+ val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+ val globPathsEnabled =
+ Option(options.get("globPaths")).map(v =>
java.lang.Boolean.parseBoolean(v)).getOrElse(true)
+ val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(
+ paths,
+ hadoopConf,
+ checkEmptyGlobPath = true,
+ checkFilesExist = true,
+ enableGlobbing = globPathsEnabled)
+ val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+ new InMemoryFileIndex(
+ sparkSession,
+ rootPathsSpecified,
+ caseSensitiveMap,
+ userSpecifiedSchema,
+ fileStatusCache)
+ }
+}
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 498933de30..078c9f2354 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat,
PartitioningAwareFileIndex, SedonaFileIndexHelper}
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType,
LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -44,6 +44,13 @@ case class GeoPackageTable(
extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
with SupportsMetadataColumns {
+ // Override fileIndex to skip the FileStreamSink.hasMetadata check that
causes
+ // spurious FileNotFoundException warnings when reading from cloud storage
(e.g., S3).
+ // GeoPackage tables are always non-streaming batch sources, so the streaming
+ // metadata check is unnecessary.
+ override lazy val fileIndex: PartitioningAwareFileIndex =
+ SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths,
userSpecifiedSchema)
+
override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
if (loadOptions.showMetadata) {
return MetadataSchema.schema
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 1f4bd093b9..1903623a6e 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat,
PartitioningAwareFileIndex, SedonaFileIndexHelper}
import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema,
fieldDescriptorsToSchema, mergeSchemas}
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, LongType, StringType,
StructField, StructType, TimestampType}
@@ -52,6 +52,13 @@ case class ShapefileTable(
extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
with SupportsMetadataColumns {
+ // Override fileIndex to skip the FileStreamSink.hasMetadata check that
causes
+ // spurious FileNotFoundException warnings when reading from cloud storage
(e.g., S3).
+ // Shapefile tables are always non-streaming batch sources, so the streaming
+ // metadata check is unnecessary.
+ override lazy val fileIndex: PartitioningAwareFileIndex =
+ SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths,
userSpecifiedSchema)
+
override def formatName: String = "Shapefile"
override def capabilities: java.util.Set[TableCapability] =
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index 845764fae5..abb18a9ddd 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.TableCapability
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat,
PartitioningAwareFileIndex, SedonaFileIndexHelper}
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,6 +36,14 @@ case class GeoParquetMetadataTable(
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+
+ // Override fileIndex to skip the FileStreamSink.hasMetadata check that
causes
+ // spurious FileNotFoundException warnings when reading from cloud storage
(e.g., S3).
+ // GeoParquet metadata tables are always non-streaming batch sources, so the
streaming
+ // metadata check is unnecessary.
+ override lazy val fileIndex: PartitioningAwareFileIndex =
+ SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths,
userSpecifiedSchema)
+
override def formatName: String = "GeoParquet Metadata"
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
diff --git
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
index 96071b8036..47d5ea1db5 100644
--- a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
+++ b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
@@ -19,6 +19,8 @@
package org.apache.sedona.sql
import org.apache.commons.io.FileUtils
+import org.apache.log4j.{AppenderSkeleton, Level, Logger}
+import org.apache.log4j.spi.LoggingEvent
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.types.{DateType, DecimalType, LongType,
StringType, StructField, StructType, TimestampType}
import org.locationtech.jts.geom.{Geometry, MultiPolygon, Point, Polygon}
@@ -27,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
import java.io.File
import java.nio.file.Files
+import java.util.{ArrayList => JList}
import scala.collection.mutable
class ShapefileTests extends TestBaseScala with BeforeAndAfterAll {
@@ -943,5 +946,40 @@ class ShapefileTests extends TestBaseScala with
BeforeAndAfterAll {
assert(r2.getLong(4) == dt2Shp.length())
assert(r2.getTimestamp(5).getTime == dt2Shp.lastModified())
}
+
+ it("reading shapefile by .shp path should not produce FileStreamSink
metadata warning") {
+ // GH-2650: When reading shapefiles by .shp path,
ShapefileDataSource.transformPaths
+ // converts it to a glob pattern (e.g., "file.???"). Without the fix,
Spark's
+ // FileTable.fileIndex calls FileStreamSink.hasMetadata which tries to
stat the glob
+ // path as a directory, causing a FileNotFoundException and a spurious
WARN log:
+ // "Assume no metadata directory. Error while looking for metadata
directory..."
+ val capturedWarnings = new JList[String]()
+ val appender = new AppenderSkeleton {
+ override def append(event: LoggingEvent): Unit = {
+ val msg = event.getRenderedMessage
+ if (msg != null && msg.contains("Assume no metadata directory")) {
+ capturedWarnings.add(msg)
+ }
+ }
+ override def close(): Unit = {}
+ override def requiresLayout(): Boolean = false
+ }
+ appender.setThreshold(Level.WARN)
+ val rootLogger = Logger.getRootLogger
+ rootLogger.addAppender(appender)
+ try {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/datatypes/datatypes1.shp")
+ df.collect()
+ assert(
+ capturedWarnings.isEmpty,
+ "FileStreamSink metadata warning should not be emitted when reading
shapefiles " +
+ "by .shp path. This warning is caused by
FileStreamSink.hasMetadata trying to " +
+ "stat the glob path as a directory. Captured warnings: " +
capturedWarnings)
+ } finally {
+ rootLogger.removeAppender(appender)
+ }
+ }
}
}
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 498933de30..078c9f2354 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat,
PartitioningAwareFileIndex, SedonaFileIndexHelper}
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType,
LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -44,6 +44,13 @@ case class GeoPackageTable(
extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
with SupportsMetadataColumns {
+ // Override fileIndex to skip the FileStreamSink.hasMetadata check that
causes
+ // spurious FileNotFoundException warnings when reading from cloud storage
(e.g., S3).
+ // GeoPackage tables are always non-streaming batch sources, so the streaming
+ // metadata check is unnecessary.
+ override lazy val fileIndex: PartitioningAwareFileIndex =
+ SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths,
userSpecifiedSchema)
+
override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
if (loadOptions.showMetadata) {
return MetadataSchema.schema
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 1f4bd093b9..1903623a6e 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat,
PartitioningAwareFileIndex, SedonaFileIndexHelper}
import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema,
fieldDescriptorsToSchema, mergeSchemas}
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, LongType, StringType,
StructField, StructType, TimestampType}
@@ -52,6 +52,13 @@ case class ShapefileTable(
extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
with SupportsMetadataColumns {
+ // Override fileIndex to skip the FileStreamSink.hasMetadata check that
causes
+ // spurious FileNotFoundException warnings when reading from cloud storage
(e.g., S3).
+ // Shapefile tables are always non-streaming batch sources, so the streaming
+ // metadata check is unnecessary.
+ override lazy val fileIndex: PartitioningAwareFileIndex =
+ SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths,
userSpecifiedSchema)
+
override def formatName: String = "Shapefile"
override def capabilities: java.util.Set[TableCapability] =
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index 845764fae5..abb18a9ddd 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.TableCapability
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat,
PartitioningAwareFileIndex, SedonaFileIndexHelper}
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,6 +36,14 @@ case class GeoParquetMetadataTable(
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+
+ // Override fileIndex to skip the FileStreamSink.hasMetadata check that
causes
+ // spurious FileNotFoundException warnings when reading from cloud storage
(e.g., S3).
+ // GeoParquet metadata tables are always non-streaming batch sources, so the
streaming
+ // metadata check is unnecessary.
+ override lazy val fileIndex: PartitioningAwareFileIndex =
+ SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths,
userSpecifiedSchema)
+
override def formatName: String = "GeoParquet Metadata"
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
diff --git
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
index 4b4f218b36..1efb8a671a 100644
--- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
+++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
@@ -19,6 +19,8 @@
package org.apache.sedona.sql
import org.apache.commons.io.FileUtils
+import org.apache.log4j.{AppenderSkeleton, Level, Logger}
+import org.apache.log4j.spi.LoggingEvent
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.types.{DateType, DecimalType, LongType,
StringType, StructField, StructType, TimestampType}
import org.locationtech.jts.geom.{Geometry, MultiPolygon, Point, Polygon}
@@ -27,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
import java.io.File
import java.nio.file.Files
+import java.util.{ArrayList => JList}
import scala.collection.mutable
class ShapefileTests extends TestBaseScala with BeforeAndAfterAll {
@@ -955,5 +958,40 @@ class ShapefileTests extends TestBaseScala with
BeforeAndAfterAll {
assert(r2.getLong(4) == dt2Shp.length())
assert(r2.getTimestamp(5).getTime == dt2Shp.lastModified())
}
+
+ it("reading shapefile by .shp path should not produce FileStreamSink
metadata warning") {
+ // GH-2650: When reading shapefiles by .shp path,
ShapefileDataSource.transformPaths
+ // converts it to a glob pattern (e.g., "file.???"). Without the fix,
Spark's
+ // FileTable.fileIndex calls FileStreamSink.hasMetadata which tries to
stat the glob
+ // path as a directory, causing a FileNotFoundException and a spurious
WARN log:
+ // "Assume no metadata directory. Error while looking for metadata
directory..."
+ val capturedWarnings = new JList[String]()
+ val appender = new AppenderSkeleton {
+ override def append(event: LoggingEvent): Unit = {
+ val msg = event.getRenderedMessage
+ if (msg != null && msg.contains("Assume no metadata directory")) {
+ capturedWarnings.add(msg)
+ }
+ }
+ override def close(): Unit = {}
+ override def requiresLayout(): Boolean = false
+ }
+ appender.setThreshold(Level.WARN)
+ val rootLogger = Logger.getRootLogger
+ rootLogger.addAppender(appender)
+ try {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/datatypes/datatypes1.shp")
+ df.collect()
+ assert(
+ capturedWarnings.isEmpty,
+ "FileStreamSink metadata warning should not be emitted when reading
shapefiles " +
+ "by .shp path. This warning is caused by
FileStreamSink.hasMetadata trying to " +
+ "stat the glob path as a directory. Captured warnings: " +
capturedWarnings)
+ } finally {
+ rootLogger.removeAppender(appender)
+ }
+ }
}
}
diff --git
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 498933de30..078c9f2354 100644
---
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat,
PartitioningAwareFileIndex, SedonaFileIndexHelper}
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType,
LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -44,6 +44,13 @@ case class GeoPackageTable(
extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
with SupportsMetadataColumns {
+ // Override fileIndex to skip the FileStreamSink.hasMetadata check that
causes
+ // spurious FileNotFoundException warnings when reading from cloud storage
(e.g., S3).
+ // GeoPackage tables are always non-streaming batch sources, so the streaming
+ // metadata check is unnecessary.
+ override lazy val fileIndex: PartitioningAwareFileIndex =
+ SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths,
userSpecifiedSchema)
+
override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
if (loadOptions.showMetadata) {
return MetadataSchema.schema
diff --git
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 1f4bd093b9..1903623a6e 100644
---
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat,
PartitioningAwareFileIndex, SedonaFileIndexHelper}
import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema,
fieldDescriptorsToSchema, mergeSchemas}
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, LongType, StringType,
StructField, StructType, TimestampType}
@@ -52,6 +52,13 @@ case class ShapefileTable(
extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
with SupportsMetadataColumns {
+ // Override fileIndex to skip the FileStreamSink.hasMetadata check that
causes
+ // spurious FileNotFoundException warnings when reading from cloud storage
(e.g., S3).
+ // Shapefile tables are always non-streaming batch sources, so the streaming
+ // metadata check is unnecessary.
+ override lazy val fileIndex: PartitioningAwareFileIndex =
+ SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths,
userSpecifiedSchema)
+
override def formatName: String = "Shapefile"
override def capabilities: java.util.Set[TableCapability] =
diff --git
a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index 845764fae5..abb18a9ddd 100644
---
a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++
b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.TableCapability
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat,
PartitioningAwareFileIndex, SedonaFileIndexHelper}
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,6 +36,14 @@ case class GeoParquetMetadataTable(
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+
+ // Override fileIndex to skip the FileStreamSink.hasMetadata check that
causes
+ // spurious FileNotFoundException warnings when reading from cloud storage
(e.g., S3).
+ // GeoParquet metadata tables are always non-streaming batch sources, so the
streaming
+ // metadata check is unnecessary.
+ override lazy val fileIndex: PartitioningAwareFileIndex =
+ SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths,
userSpecifiedSchema)
+
override def formatName: String = "GeoParquet Metadata"
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
diff --git
a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
index 4b4f218b36..1efb8a671a 100644
--- a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
+++ b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
@@ -19,6 +19,8 @@
package org.apache.sedona.sql
import org.apache.commons.io.FileUtils
+import org.apache.log4j.{AppenderSkeleton, Level, Logger}
+import org.apache.log4j.spi.LoggingEvent
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.types.{DateType, DecimalType, LongType,
StringType, StructField, StructType, TimestampType}
import org.locationtech.jts.geom.{Geometry, MultiPolygon, Point, Polygon}
@@ -27,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
import java.io.File
import java.nio.file.Files
+import java.util.{ArrayList => JList}
import scala.collection.mutable
class ShapefileTests extends TestBaseScala with BeforeAndAfterAll {
@@ -955,5 +958,40 @@ class ShapefileTests extends TestBaseScala with
BeforeAndAfterAll {
assert(r2.getLong(4) == dt2Shp.length())
assert(r2.getTimestamp(5).getTime == dt2Shp.lastModified())
}
+
+ it("reading shapefile by .shp path should not produce FileStreamSink
metadata warning") {
+ // GH-2650: When reading shapefiles by .shp path,
ShapefileDataSource.transformPaths
+ // converts it to a glob pattern (e.g., "file.???"). Without the fix,
Spark's
+ // FileTable.fileIndex calls FileStreamSink.hasMetadata which tries to
stat the glob
+ // path as a directory, causing a FileNotFoundException and a spurious
WARN log:
+ // "Assume no metadata directory. Error while looking for metadata
directory..."
+ val capturedWarnings = new JList[String]()
+ val appender = new AppenderSkeleton {
+ override def append(event: LoggingEvent): Unit = {
+ val msg = event.getRenderedMessage
+ if (msg != null && msg.contains("Assume no metadata directory")) {
+ capturedWarnings.add(msg)
+ }
+ }
+ override def close(): Unit = {}
+ override def requiresLayout(): Boolean = false
+ }
+ appender.setThreshold(Level.WARN)
+ val rootLogger = Logger.getRootLogger
+ rootLogger.addAppender(appender)
+ try {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/datatypes/datatypes1.shp")
+ df.collect()
+ assert(
+ capturedWarnings.isEmpty,
+ "FileStreamSink metadata warning should not be emitted when reading
shapefiles " +
+ "by .shp path. This warning is caused by
FileStreamSink.hasMetadata trying to " +
+ "stat the glob path as a directory. Captured warnings: " +
capturedWarnings)
+ } finally {
+ rootLogger.removeAppender(appender)
+ }
+ }
}
}
diff --git
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 498933de30..078c9f2354 100644
---
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat,
PartitioningAwareFileIndex, SedonaFileIndexHelper}
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType,
LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -44,6 +44,13 @@ case class GeoPackageTable(
extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
with SupportsMetadataColumns {
+ // Override fileIndex to skip the FileStreamSink.hasMetadata check that
causes
+ // spurious FileNotFoundException warnings when reading from cloud storage
(e.g., S3).
+ // GeoPackage tables are always non-streaming batch sources, so the streaming
+ // metadata check is unnecessary.
+ override lazy val fileIndex: PartitioningAwareFileIndex =
+ SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths,
userSpecifiedSchema)
+
override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
if (loadOptions.showMetadata) {
return MetadataSchema.schema
diff --git
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 1f4bd093b9..1903623a6e 100644
---
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat,
PartitioningAwareFileIndex, SedonaFileIndexHelper}
import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema,
fieldDescriptorsToSchema, mergeSchemas}
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, LongType, StringType,
StructField, StructType, TimestampType}
@@ -52,6 +52,13 @@ case class ShapefileTable(
extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
with SupportsMetadataColumns {
+ // Override fileIndex to skip the FileStreamSink.hasMetadata check that
causes
+ // spurious FileNotFoundException warnings when reading from cloud storage
(e.g., S3).
+ // Shapefile tables are always non-streaming batch sources, so the streaming
+ // metadata check is unnecessary.
+ override lazy val fileIndex: PartitioningAwareFileIndex =
+ SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths,
userSpecifiedSchema)
+
override def formatName: String = "Shapefile"
override def capabilities: java.util.Set[TableCapability] =
diff --git
a/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
b/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index 845764fae5..abb18a9ddd 100644
---
a/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++
b/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.TableCapability
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat,
PartitioningAwareFileIndex, SedonaFileIndexHelper}
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,6 +36,14 @@ case class GeoParquetMetadataTable(
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+
+ // Override fileIndex to skip the FileStreamSink.hasMetadata check that
causes
+ // spurious FileNotFoundException warnings when reading from cloud storage
(e.g., S3).
+ // GeoParquet metadata tables are always non-streaming batch sources, so the
streaming
+ // metadata check is unnecessary.
+ override lazy val fileIndex: PartitioningAwareFileIndex =
+ SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths,
userSpecifiedSchema)
+
override def formatName: String = "GeoParquet Metadata"
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
diff --git
a/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
b/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
index 4b4f218b36..1efb8a671a 100644
--- a/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
+++ b/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
@@ -19,6 +19,8 @@
package org.apache.sedona.sql
import org.apache.commons.io.FileUtils
+import org.apache.log4j.{AppenderSkeleton, Level, Logger}
+import org.apache.log4j.spi.LoggingEvent
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.types.{DateType, DecimalType, LongType,
StringType, StructField, StructType, TimestampType}
import org.locationtech.jts.geom.{Geometry, MultiPolygon, Point, Polygon}
@@ -27,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
import java.io.File
import java.nio.file.Files
+import java.util.{ArrayList => JList}
import scala.collection.mutable
class ShapefileTests extends TestBaseScala with BeforeAndAfterAll {
@@ -955,5 +958,40 @@ class ShapefileTests extends TestBaseScala with
BeforeAndAfterAll {
assert(r2.getLong(4) == dt2Shp.length())
assert(r2.getTimestamp(5).getTime == dt2Shp.lastModified())
}
+
+ it("reading shapefile by .shp path should not produce FileStreamSink
metadata warning") {
+ // GH-2650: When reading shapefiles by .shp path,
ShapefileDataSource.transformPaths
+ // converts it to a glob pattern (e.g., "file.???"). Without the fix,
Spark's
+ // FileTable.fileIndex calls FileStreamSink.hasMetadata which tries to
stat the glob
+ // path as a directory, causing a FileNotFoundException and a spurious
WARN log:
+ // "Assume no metadata directory. Error while looking for metadata
directory..."
+ val capturedWarnings = new JList[String]()
+ val appender = new AppenderSkeleton {
+ override def append(event: LoggingEvent): Unit = {
+ val msg = event.getRenderedMessage
+ if (msg != null && msg.contains("Assume no metadata directory")) {
+ capturedWarnings.add(msg)
+ }
+ }
+ override def close(): Unit = {}
+ override def requiresLayout(): Boolean = false
+ }
+ appender.setThreshold(Level.WARN)
+ val rootLogger = Logger.getRootLogger
+ rootLogger.addAppender(appender)
+ try {
+ val df = sparkSession.read
+ .format("shapefile")
+ .load(resourceFolder + "shapefiles/datatypes/datatypes1.shp")
+ df.collect()
+ assert(
+ capturedWarnings.isEmpty,
+ "FileStreamSink metadata warning should not be emitted when reading
shapefiles " +
+ "by .shp path. This warning is caused by
FileStreamSink.hasMetadata trying to " +
+ "stat the glob path as a directory. Captured warnings: " +
capturedWarnings)
+ } finally {
+ rootLogger.removeAppender(appender)
+ }
+ }
}
}