This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e6303e703 [GH-2650] Fix warning message when reading shapefiles from 
S3 (#2655)
1e6303e703 is described below

commit 1e6303e70320549ffb013e28914f27d9bd6fdd0c
Author: Jia Yu <[email protected]>
AuthorDate: Sun Feb 15 02:06:06 2026 -0700

    [GH-2650] Fix warning message when reading shapefiles from S3 (#2655)
---
 .../datasources/SedonaFileIndexHelper.scala        | 66 ++++++++++++++++++++++
 .../datasources/geopackage/GeoPackageTable.scala   |  9 ++-
 .../sql/datasources/shapefile/ShapefileTable.scala |  9 ++-
 .../metadata/GeoParquetMetadataTable.scala         | 10 +++-
 .../org/apache/sedona/sql/ShapefileTests.scala     | 38 +++++++++++++
 .../datasources/geopackage/GeoPackageTable.scala   |  9 ++-
 .../sql/datasources/shapefile/ShapefileTable.scala |  9 ++-
 .../metadata/GeoParquetMetadataTable.scala         | 10 +++-
 .../org/apache/sedona/sql/ShapefileTests.scala     | 38 +++++++++++++
 .../datasources/geopackage/GeoPackageTable.scala   |  9 ++-
 .../sql/datasources/shapefile/ShapefileTable.scala |  9 ++-
 .../metadata/GeoParquetMetadataTable.scala         | 10 +++-
 .../org/apache/sedona/sql/ShapefileTests.scala     | 38 +++++++++++++
 .../datasources/geopackage/GeoPackageTable.scala   |  9 ++-
 .../sql/datasources/shapefile/ShapefileTable.scala |  9 ++-
 .../metadata/GeoParquetMetadataTable.scala         | 10 +++-
 .../org/apache/sedona/sql/ShapefileTests.scala     | 38 +++++++++++++
 17 files changed, 318 insertions(+), 12 deletions(-)

diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/SedonaFileIndexHelper.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/SedonaFileIndexHelper.scala
new file mode 100644
index 0000000000..7971a1b4ba
--- /dev/null
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/SedonaFileIndexHelper.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+
+/**
+ * Helper for creating a [[PartitioningAwareFileIndex]] without going through 
the
+ * [[org.apache.spark.sql.execution.streaming.FileStreamSink.hasMetadata]] 
check in
+ * [[org.apache.spark.sql.execution.datasources.v2.FileTable.fileIndex]].
+ *
+ * <p>The streaming metadata check can produce spurious 
[[java.io.FileNotFoundException]] warnings
+ * when reading from cloud storage (e.g., S3) because it attempts to stat the 
path as a directory.
+ * For non-streaming, read-only file tables such as Shapefile and GeoPackage, 
this check is
+ * unnecessary and can be safely bypassed.
+ */
+object SedonaFileIndexHelper {
+
+  /**
+   * Build an [[InMemoryFileIndex]] for the given paths, resolving globs if 
necessary, without the
+   * streaming metadata directory check.
+   */
+  def createFileIndex(
+      sparkSession: SparkSession,
+      options: CaseInsensitiveStringMap,
+      paths: Seq[String],
+      userSpecifiedSchema: Option[StructType]): PartitioningAwareFileIndex = {
+    val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+    val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+    val globPathsEnabled =
+      Option(options.get("globPaths")).map(v => 
java.lang.Boolean.parseBoolean(v)).getOrElse(true)
+    val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(
+      paths,
+      hadoopConf,
+      checkEmptyGlobPath = true,
+      checkFilesExist = true,
+      enableGlobbing = globPathsEnabled)
+    val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+    new InMemoryFileIndex(
+      sparkSession,
+      rootPathsSpecified,
+      caseSensitiveMap,
+      userSpecifiedSchema,
+      fileStatusCache)
+  }
+}
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 498933de30..078c9f2354 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, 
LongType, StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -44,6 +44,13 @@ case class GeoPackageTable(
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
     with SupportsMetadataColumns {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoPackage tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
     if (loadOptions.showMetadata) {
       return MetadataSchema.schema
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 1f4bd093b9..1903623a6e 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns, TableCapability}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema, 
fieldDescriptorsToSchema, mergeSchemas}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.{DataType, LongType, StringType, 
StructField, StructType, TimestampType}
@@ -52,6 +52,13 @@ case class ShapefileTable(
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
     with SupportsMetadataColumns {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // Shapefile tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "Shapefile"
 
   override def capabilities: java.util.Set[TableCapability] =
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index 845764fae5..abb18a9ddd 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.TableCapability
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,6 +36,14 @@ case class GeoParquetMetadataTable(
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat])
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoParquet metadata tables are always non-streaming batch sources, so the 
streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "GeoParquet Metadata"
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
diff --git 
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala 
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
index 96071b8036..47d5ea1db5 100644
--- a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
+++ b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
@@ -19,6 +19,8 @@
 package org.apache.sedona.sql
 
 import org.apache.commons.io.FileUtils
+import org.apache.log4j.{AppenderSkeleton, Level, Logger}
+import org.apache.log4j.spi.LoggingEvent
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.types.{DateType, DecimalType, LongType, 
StringType, StructField, StructType, TimestampType}
 import org.locationtech.jts.geom.{Geometry, MultiPolygon, Point, Polygon}
@@ -27,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import java.io.File
 import java.nio.file.Files
+import java.util.{ArrayList => JList}
 import scala.collection.mutable
 
 class ShapefileTests extends TestBaseScala with BeforeAndAfterAll {
@@ -943,5 +946,40 @@ class ShapefileTests extends TestBaseScala with 
BeforeAndAfterAll {
       assert(r2.getLong(4) == dt2Shp.length())
       assert(r2.getTimestamp(5).getTime == dt2Shp.lastModified())
     }
+
+    it("reading shapefile by .shp path should not produce FileStreamSink 
metadata warning") {
+      // GH-2650: When reading shapefiles by .shp path, 
ShapefileDataSource.transformPaths
+      // converts it to a glob pattern (e.g., "file.???"). Without the fix, 
Spark's
+      // FileTable.fileIndex calls FileStreamSink.hasMetadata which tries to 
stat the glob
+      // path as a directory, causing a FileNotFoundException and a spurious 
WARN log:
+      // "Assume no metadata directory. Error while looking for metadata 
directory..."
+      val capturedWarnings = new JList[String]()
+      val appender = new AppenderSkeleton {
+        override def append(event: LoggingEvent): Unit = {
+          val msg = event.getRenderedMessage
+          if (msg != null && msg.contains("Assume no metadata directory")) {
+            capturedWarnings.add(msg)
+          }
+        }
+        override def close(): Unit = {}
+        override def requiresLayout(): Boolean = false
+      }
+      appender.setThreshold(Level.WARN)
+      val rootLogger = Logger.getRootLogger
+      rootLogger.addAppender(appender)
+      try {
+        val df = sparkSession.read
+          .format("shapefile")
+          .load(resourceFolder + "shapefiles/datatypes/datatypes1.shp")
+        df.collect()
+        assert(
+          capturedWarnings.isEmpty,
+          "FileStreamSink metadata warning should not be emitted when reading 
shapefiles " +
+            "by .shp path. This warning is caused by 
FileStreamSink.hasMetadata trying to " +
+            "stat the glob path as a directory. Captured warnings: " + 
capturedWarnings)
+      } finally {
+        rootLogger.removeAppender(appender)
+      }
+    }
   }
 }
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 498933de30..078c9f2354 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, 
LongType, StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -44,6 +44,13 @@ case class GeoPackageTable(
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
     with SupportsMetadataColumns {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoPackage tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
     if (loadOptions.showMetadata) {
       return MetadataSchema.schema
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 1f4bd093b9..1903623a6e 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns, TableCapability}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema, 
fieldDescriptorsToSchema, mergeSchemas}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.{DataType, LongType, StringType, 
StructField, StructType, TimestampType}
@@ -52,6 +52,13 @@ case class ShapefileTable(
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
     with SupportsMetadataColumns {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // Shapefile tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "Shapefile"
 
   override def capabilities: java.util.Set[TableCapability] =
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index 845764fae5..abb18a9ddd 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.TableCapability
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,6 +36,14 @@ case class GeoParquetMetadataTable(
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat])
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoParquet metadata tables are always non-streaming batch sources, so the 
streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "GeoParquet Metadata"
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
index 4b4f218b36..1efb8a671a 100644
--- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
+++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
@@ -19,6 +19,8 @@
 package org.apache.sedona.sql
 
 import org.apache.commons.io.FileUtils
+import org.apache.log4j.{AppenderSkeleton, Level, Logger}
+import org.apache.log4j.spi.LoggingEvent
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.types.{DateType, DecimalType, LongType, 
StringType, StructField, StructType, TimestampType}
 import org.locationtech.jts.geom.{Geometry, MultiPolygon, Point, Polygon}
@@ -27,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import java.io.File
 import java.nio.file.Files
+import java.util.{ArrayList => JList}
 import scala.collection.mutable
 
 class ShapefileTests extends TestBaseScala with BeforeAndAfterAll {
@@ -955,5 +958,40 @@ class ShapefileTests extends TestBaseScala with 
BeforeAndAfterAll {
       assert(r2.getLong(4) == dt2Shp.length())
       assert(r2.getTimestamp(5).getTime == dt2Shp.lastModified())
     }
+
+    it("reading shapefile by .shp path should not produce FileStreamSink 
metadata warning") {
+      // GH-2650: When reading shapefiles by .shp path, 
ShapefileDataSource.transformPaths
+      // converts it to a glob pattern (e.g., "file.???"). Without the fix, 
Spark's
+      // FileTable.fileIndex calls FileStreamSink.hasMetadata which tries to 
stat the glob
+      // path as a directory, causing a FileNotFoundException and a spurious 
WARN log:
+      // "Assume no metadata directory. Error while looking for metadata 
directory..."
+      val capturedWarnings = new JList[String]()
+      val appender = new AppenderSkeleton {
+        override def append(event: LoggingEvent): Unit = {
+          val msg = event.getRenderedMessage
+          if (msg != null && msg.contains("Assume no metadata directory")) {
+            capturedWarnings.add(msg)
+          }
+        }
+        override def close(): Unit = {}
+        override def requiresLayout(): Boolean = false
+      }
+      appender.setThreshold(Level.WARN)
+      val rootLogger = Logger.getRootLogger
+      rootLogger.addAppender(appender)
+      try {
+        val df = sparkSession.read
+          .format("shapefile")
+          .load(resourceFolder + "shapefiles/datatypes/datatypes1.shp")
+        df.collect()
+        assert(
+          capturedWarnings.isEmpty,
+          "FileStreamSink metadata warning should not be emitted when reading 
shapefiles " +
+            "by .shp path. This warning is caused by 
FileStreamSink.hasMetadata trying to " +
+            "stat the glob path as a directory. Captured warnings: " + 
capturedWarnings)
+      } finally {
+        rootLogger.removeAppender(appender)
+      }
+    }
   }
 }
diff --git 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 498933de30..078c9f2354 100644
--- 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++ 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, 
LongType, StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -44,6 +44,13 @@ case class GeoPackageTable(
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
     with SupportsMetadataColumns {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoPackage tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
     if (loadOptions.showMetadata) {
       return MetadataSchema.schema
diff --git 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 1f4bd093b9..1903623a6e 100644
--- 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++ 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns, TableCapability}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema, 
fieldDescriptorsToSchema, mergeSchemas}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.{DataType, LongType, StringType, 
StructField, StructType, TimestampType}
@@ -52,6 +52,13 @@ case class ShapefileTable(
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
     with SupportsMetadataColumns {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // Shapefile tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "Shapefile"
 
   override def capabilities: java.util.Set[TableCapability] =
diff --git 
a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
 
b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index 845764fae5..abb18a9ddd 100644
--- 
a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++ 
b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.TableCapability
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,6 +36,14 @@ case class GeoParquetMetadataTable(
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat])
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoParquet metadata tables are always non-streaming batch sources, so the 
streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "GeoParquet Metadata"
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
diff --git 
a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala 
b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
index 4b4f218b36..1efb8a671a 100644
--- a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
+++ b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
@@ -19,6 +19,8 @@
 package org.apache.sedona.sql
 
 import org.apache.commons.io.FileUtils
+import org.apache.log4j.{AppenderSkeleton, Level, Logger}
+import org.apache.log4j.spi.LoggingEvent
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.types.{DateType, DecimalType, LongType, 
StringType, StructField, StructType, TimestampType}
 import org.locationtech.jts.geom.{Geometry, MultiPolygon, Point, Polygon}
@@ -27,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import java.io.File
 import java.nio.file.Files
+import java.util.{ArrayList => JList}
 import scala.collection.mutable
 
 class ShapefileTests extends TestBaseScala with BeforeAndAfterAll {
@@ -955,5 +958,40 @@ class ShapefileTests extends TestBaseScala with 
BeforeAndAfterAll {
       assert(r2.getLong(4) == dt2Shp.length())
       assert(r2.getTimestamp(5).getTime == dt2Shp.lastModified())
     }
+
+    it("reading shapefile by .shp path should not produce FileStreamSink 
metadata warning") {
+      // GH-2650: When reading shapefiles by .shp path, 
ShapefileDataSource.transformPaths
+      // converts it to a glob pattern (e.g., "file.???"). Without the fix, 
Spark's
+      // FileTable.fileIndex calls FileStreamSink.hasMetadata which tries to 
stat the glob
+      // path as a directory, causing a FileNotFoundException and a spurious 
WARN log:
+      // "Assume no metadata directory. Error while looking for metadata 
directory..."
+      val capturedWarnings = new JList[String]()
+      val appender = new AppenderSkeleton {
+        override def append(event: LoggingEvent): Unit = {
+          val msg = event.getRenderedMessage
+          if (msg != null && msg.contains("Assume no metadata directory")) {
+            capturedWarnings.add(msg)
+          }
+        }
+        override def close(): Unit = {}
+        override def requiresLayout(): Boolean = false
+      }
+      appender.setThreshold(Level.WARN)
+      val rootLogger = Logger.getRootLogger
+      rootLogger.addAppender(appender)
+      try {
+        val df = sparkSession.read
+          .format("shapefile")
+          .load(resourceFolder + "shapefiles/datatypes/datatypes1.shp")
+        df.collect()
+        assert(
+          capturedWarnings.isEmpty,
+          "FileStreamSink metadata warning should not be emitted when reading 
shapefiles " +
+            "by .shp path. This warning is caused by 
FileStreamSink.hasMetadata trying to " +
+            "stat the glob path as a directory. Captured warnings: " + 
capturedWarnings)
+      } finally {
+        rootLogger.removeAppender(appender)
+      }
+    }
   }
 }
diff --git 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 498933de30..078c9f2354 100644
--- 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++ 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, 
LongType, StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -44,6 +44,13 @@ case class GeoPackageTable(
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
     with SupportsMetadataColumns {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoPackage tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
     if (loadOptions.showMetadata) {
       return MetadataSchema.schema
diff --git 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 1f4bd093b9..1903623a6e 100644
--- 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++ 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns, TableCapability}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema, 
fieldDescriptorsToSchema, mergeSchemas}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.{DataType, LongType, StringType, 
StructField, StructType, TimestampType}
@@ -52,6 +52,13 @@ case class ShapefileTable(
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema)
     with SupportsMetadataColumns {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // Shapefile tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "Shapefile"
 
   override def capabilities: java.util.Set[TableCapability] =
diff --git 
a/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
 
b/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index 845764fae5..abb18a9ddd 100644
--- 
a/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++ 
b/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.TableCapability
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,6 +36,14 @@ case class GeoParquetMetadataTable(
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat])
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoParquet metadata tables are always non-streaming batch sources, so the 
streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "GeoParquet Metadata"
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
diff --git 
a/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala 
b/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
index 4b4f218b36..1efb8a671a 100644
--- a/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
+++ b/spark/spark-4.1/src/test/scala/org/apache/sedona/sql/ShapefileTests.scala
@@ -19,6 +19,8 @@
 package org.apache.sedona.sql
 
 import org.apache.commons.io.FileUtils
+import org.apache.log4j.{AppenderSkeleton, Level, Logger}
+import org.apache.log4j.spi.LoggingEvent
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.types.{DateType, DecimalType, LongType, 
StringType, StructField, StructType, TimestampType}
 import org.locationtech.jts.geom.{Geometry, MultiPolygon, Point, Polygon}
@@ -27,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import java.io.File
 import java.nio.file.Files
+import java.util.{ArrayList => JList}
 import scala.collection.mutable
 
 class ShapefileTests extends TestBaseScala with BeforeAndAfterAll {
@@ -955,5 +958,40 @@ class ShapefileTests extends TestBaseScala with 
BeforeAndAfterAll {
       assert(r2.getLong(4) == dt2Shp.length())
       assert(r2.getTimestamp(5).getTime == dt2Shp.lastModified())
     }
+
+    it("reading shapefile by .shp path should not produce FileStreamSink 
metadata warning") {
+      // GH-2650: When reading shapefiles by .shp path, 
ShapefileDataSource.transformPaths
+      // converts it to a glob pattern (e.g., "file.???"). Without the fix, 
Spark's
+      // FileTable.fileIndex calls FileStreamSink.hasMetadata which tries to 
stat the glob
+      // path as a directory, causing a FileNotFoundException and a spurious 
WARN log:
+      // "Assume no metadata directory. Error while looking for metadata 
directory..."
+      val capturedWarnings = new JList[String]()
+      val appender = new AppenderSkeleton {
+        override def append(event: LoggingEvent): Unit = {
+          val msg = event.getRenderedMessage
+          if (msg != null && msg.contains("Assume no metadata directory")) {
+            capturedWarnings.add(msg)
+          }
+        }
+        override def close(): Unit = {}
+        override def requiresLayout(): Boolean = false
+      }
+      appender.setThreshold(Level.WARN)
+      val rootLogger = Logger.getRootLogger
+      rootLogger.addAppender(appender)
+      try {
+        val df = sparkSession.read
+          .format("shapefile")
+          .load(resourceFolder + "shapefiles/datatypes/datatypes1.shp")
+        df.collect()
+        assert(
+          capturedWarnings.isEmpty,
+          "FileStreamSink metadata warning should not be emitted when reading 
shapefiles " +
+            "by .shp path. This warning is caused by 
FileStreamSink.hasMetadata trying to " +
+            "stat the glob path as a directory. Captured warnings: " + 
capturedWarnings)
+      } finally {
+        rootLogger.removeAppender(appender)
+      }
+    }
   }
 }


Reply via email to