This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch fix/GH-2650-shapefile-s3-warning
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit d95e9274a2a3ac45ac94143cd2c0c370be4318d0
Author: Jia Yu <[email protected]>
AuthorDate: Sat Feb 14 23:54:10 2026 -0800

    Fix warning message when reading shapefiles from S3
    
    Override fileIndex in ShapefileTable, GeoPackageTable, and
    GeoParquetMetadataTable to skip FileStreamSink.hasMetadata check.
    
    Spark's FileTable.fileIndex calls FileStreamSink.hasMetadata which tries
    to stat the input paths as directories. For shapefile paths that get
    transformed to glob patterns (e.g., file.???), this causes
    FileNotFoundException warnings on S3.
    
    The fix creates SedonaFileIndexHelper in the org.apache.spark.sql package
    to access the package-private DataSource.checkAndGlobPathIfNecessary
    method directly, bypassing the streaming metadata check that is
    irrelevant for batch read-only sources.
    
    Fixes #2650
---
 .../datasources/SedonaFileIndexHelper.scala        | 66 ++++++++++++++++++++++
 .../datasources/geopackage/GeoPackageTable.scala   |  9 ++-
 .../sql/datasources/shapefile/ShapefileTable.scala |  9 ++-
 .../metadata/GeoParquetMetadataTable.scala         | 10 +++-
 .../datasources/geopackage/GeoPackageTable.scala   |  9 ++-
 .../sql/datasources/shapefile/ShapefileTable.scala |  9 ++-
 .../metadata/GeoParquetMetadataTable.scala         | 10 +++-
 .../datasources/geopackage/GeoPackageTable.scala   |  9 ++-
 .../sql/datasources/shapefile/ShapefileTable.scala |  9 ++-
 .../metadata/GeoParquetMetadataTable.scala         | 10 +++-
 .../datasources/geopackage/GeoPackageTable.scala   |  9 ++-
 .../sql/datasources/shapefile/ShapefileTable.scala |  9 ++-
 .../metadata/GeoParquetMetadataTable.scala         | 10 +++-
 13 files changed, 166 insertions(+), 12 deletions(-)

diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/SedonaFileIndexHelper.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/SedonaFileIndexHelper.scala
new file mode 100644
index 0000000000..8588d7cda1
--- /dev/null
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/SedonaFileIndexHelper.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+
+/**
+ * Helper for creating a [[PartitioningAwareFileIndex]] without going through 
the
+ * [[org.apache.spark.sql.execution.streaming.FileStreamSink.hasMetadata]] 
check in
+ * [[org.apache.spark.sql.execution.datasources.v2.FileTable.fileIndex]].
+ *
+ * <p>The streaming metadata check can produce spurious 
[[java.io.FileNotFoundException]] warnings
+ * when reading from cloud storage (e.g., S3) because it attempts to stat the 
path as a directory.
+ * For non-streaming, read-only file tables such as Shapefile and GeoPackage, 
this check is
+ * unnecessary and can be safely bypassed.
+ */
+object SedonaFileIndexHelper {
+
+  /**
+   * Build an [[InMemoryFileIndex]] for the given paths, resolving globs if 
necessary, without the
+   * streaming metadata directory check.
+   */
+  def createFileIndex(
+      sparkSession: SparkSession,
+      options: CaseInsensitiveStringMap,
+      paths: Seq[String],
+      userSpecifiedSchema: Option[StructType]): PartitioningAwareFileIndex = {
+    val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+    val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+    val globPathsEnabled = Option(options.get("globPaths")).map(_ == 
"true").getOrElse(true)
+    val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(
+      paths,
+      hadoopConf,
+      checkEmptyGlobPath = true,
+      checkFilesExist = true,
+      enableGlobbing = globPathsEnabled)
+    val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+    new InMemoryFileIndex(
+      sparkSession,
+      rootPathsSpecified,
+      caseSensitiveMap,
+      userSpecifiedSchema,
+      fileStatusCache)
+  }
+}
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 999aa81280..db3627d912 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -24,7 +24,7 @@ import 
org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Me
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, 
StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -42,6 +42,13 @@ case class GeoPackageTable(
     loadOptions: GeoPackageOptions)
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoPackage tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
     if (loadOptions.showMetadata) {
       return MetadataSchema.schema
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 7db6bb8d1f..40ad2606eb 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.TableCapability
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema, 
fieldDescriptorsToSchema, mergeSchemas}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.StructType
@@ -43,6 +43,13 @@ case class ShapefileTable(
     fallbackFileFormat: Class[_ <: FileFormat])
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // Shapefile tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "Shapefile"
 
   override def capabilities: java.util.Set[TableCapability] =
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index 845764fae5..abb18a9ddd 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.TableCapability
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,6 +36,14 @@ case class GeoParquetMetadataTable(
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat])
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoParquet metadata tables are always non-streaming batch sources, so the 
streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "GeoParquet Metadata"
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 85dec8427e..66087d1ce9 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -24,7 +24,7 @@ import 
org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Me
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, 
StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -42,6 +42,13 @@ case class GeoPackageTable(
     loadOptions: GeoPackageOptions)
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoPackage tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
     if (loadOptions.showMetadata) {
       return MetadataSchema.schema
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 7db6bb8d1f..40ad2606eb 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.TableCapability
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema, 
fieldDescriptorsToSchema, mergeSchemas}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.StructType
@@ -43,6 +43,13 @@ case class ShapefileTable(
     fallbackFileFormat: Class[_ <: FileFormat])
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // Shapefile tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "Shapefile"
 
   override def capabilities: java.util.Set[TableCapability] =
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index 845764fae5..abb18a9ddd 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.TableCapability
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,6 +36,14 @@ case class GeoParquetMetadataTable(
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat])
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoParquet metadata tables are always non-streaming batch sources, so the 
streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "GeoParquet Metadata"
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
diff --git 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 85dec8427e..66087d1ce9 100644
--- 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++ 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -24,7 +24,7 @@ import 
org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Me
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, 
StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -42,6 +42,13 @@ case class GeoPackageTable(
     loadOptions: GeoPackageOptions)
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoPackage tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
     if (loadOptions.showMetadata) {
       return MetadataSchema.schema
diff --git 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 7db6bb8d1f..40ad2606eb 100644
--- 
a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++ 
b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.TableCapability
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema, 
fieldDescriptorsToSchema, mergeSchemas}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.StructType
@@ -43,6 +43,13 @@ case class ShapefileTable(
     fallbackFileFormat: Class[_ <: FileFormat])
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // Shapefile tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "Shapefile"
 
   override def capabilities: java.util.Set[TableCapability] =
diff --git 
a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
 
b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index 845764fae5..abb18a9ddd 100644
--- 
a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++ 
b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.TableCapability
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,6 +36,14 @@ case class GeoParquetMetadataTable(
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat])
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoParquet metadata tables are always non-streaming batch sources, so the 
streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "GeoParquet Metadata"
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
diff --git 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
index 85dec8427e..66087d1ce9 100644
--- 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
+++ 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala
@@ -24,7 +24,7 @@ import 
org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Me
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, 
StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -42,6 +42,13 @@ case class GeoPackageTable(
     loadOptions: GeoPackageOptions)
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoPackage tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
     if (loadOptions.showMetadata) {
       return MetadataSchema.schema
diff --git 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
index 7db6bb8d1f..40ad2606eb 100644
--- 
a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
+++ 
b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.TableCapability
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema, 
fieldDescriptorsToSchema, mergeSchemas}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types.StructType
@@ -43,6 +43,13 @@ case class ShapefileTable(
     fallbackFileFormat: Class[_ <: FileFormat])
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
 
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // Shapefile tables are always non-streaming batch sources, so the streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "Shapefile"
 
   override def capabilities: java.util.Set[TableCapability] =
diff --git 
a/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
 
b/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index 845764fae5..abb18a9ddd 100644
--- 
a/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++ 
b/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.TableCapability
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
PartitioningAwareFileIndex, SedonaFileIndexHelper}
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -36,6 +36,14 @@ case class GeoParquetMetadataTable(
     userSpecifiedSchema: Option[StructType],
     fallbackFileFormat: Class[_ <: FileFormat])
     extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+
+  // Override fileIndex to skip the FileStreamSink.hasMetadata check that 
causes
+  // spurious FileNotFoundException warnings when reading from cloud storage 
(e.g., S3).
+  // GeoParquet metadata tables are always non-streaming batch sources, so the 
streaming
+  // metadata check is unnecessary.
+  override lazy val fileIndex: PartitioningAwareFileIndex =
+    SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, 
userSpecifiedSchema)
+
   override def formatName: String = "GeoParquet Metadata"
 
   override def inferSchema(files: Seq[FileStatus]): Option[StructType] =

Reply via email to