This is an automated email from the ASF dual-hosted git repository. jiayu pushed a commit to branch fix/GH-2650-shapefile-s3-warning in repository https://gitbox.apache.org/repos/asf/sedona.git
commit d95e9274a2a3ac45ac94143cd2c0c370be4318d0 Author: Jia Yu <[email protected]> AuthorDate: Sat Feb 14 23:54:10 2026 -0800 Fix warning message when reading shapefiles from S3 Override fileIndex in ShapefileTable, GeoPackageTable, and GeoParquetMetadataTable to skip FileStreamSink.hasMetadata check. Spark's FileTable.fileIndex calls FileStreamSink.hasMetadata which tries to stat the input paths as directories. For shapefile paths that get transformed to glob patterns (e.g., file.???), this causes FileNotFoundException warnings on S3. The fix creates SedonaFileIndexHelper in the org.apache.spark.sql package to access the package-private DataSource.checkAndGlobPathIfNecessary method directly, bypassing the streaming metadata check that is irrelevant for batch read-only sources. Fixes #2650 --- .../datasources/SedonaFileIndexHelper.scala | 66 ++++++++++++++++++++++ .../datasources/geopackage/GeoPackageTable.scala | 9 ++- .../sql/datasources/shapefile/ShapefileTable.scala | 9 ++- .../metadata/GeoParquetMetadataTable.scala | 10 +++- .../datasources/geopackage/GeoPackageTable.scala | 9 ++- .../sql/datasources/shapefile/ShapefileTable.scala | 9 ++- .../metadata/GeoParquetMetadataTable.scala | 10 +++- .../datasources/geopackage/GeoPackageTable.scala | 9 ++- .../sql/datasources/shapefile/ShapefileTable.scala | 9 ++- .../metadata/GeoParquetMetadataTable.scala | 10 +++- .../datasources/geopackage/GeoPackageTable.scala | 9 ++- .../sql/datasources/shapefile/ShapefileTable.scala | 9 ++- .../metadata/GeoParquetMetadataTable.scala | 10 +++- 13 files changed, 166 insertions(+), 12 deletions(-) diff --git a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/SedonaFileIndexHelper.scala b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/SedonaFileIndexHelper.scala new file mode 100644 index 0000000000..8588d7cda1 --- /dev/null +++ b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/SedonaFileIndexHelper.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import scala.collection.JavaConverters._ + +/** + * Helper for creating a [[PartitioningAwareFileIndex]] without going through the + * [[org.apache.spark.sql.execution.streaming.FileStreamSink.hasMetadata]] check in + * [[org.apache.spark.sql.execution.datasources.v2.FileTable.fileIndex]]. + * + * <p>The streaming metadata check can produce spurious [[java.io.FileNotFoundException]] warnings + * when reading from cloud storage (e.g., S3) because it attempts to stat the path as a directory. + * For non-streaming, read-only file tables such as Shapefile and GeoPackage, this check is + * unnecessary and can be safely bypassed. + */ +object SedonaFileIndexHelper { + + /** + * Build an [[InMemoryFileIndex]] for the given paths, resolving globs if necessary, without the + * streaming metadata directory check. + */ + def createFileIndex( + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType]): PartitioningAwareFileIndex = { + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + val globPathsEnabled = Option(options.get("globPaths")).map(_ == "true").getOrElse(true) + val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary( + paths, + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + enableGlobbing = globPathsEnabled) + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + new InMemoryFileIndex( + sparkSession, + rootPathsSpecified, + caseSensitiveMap, + userSpecifiedSchema, + fileStatusCache) + } +} diff --git a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala index 999aa81280..db3627d912 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala @@ -24,7 +24,7 @@ import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Me import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper} import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -42,6 +42,13 @@ case class GeoPackageTable( loadOptions: GeoPackageOptions) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + // Override fileIndex to skip the FileStreamSink.hasMetadata check that causes + // spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3). + // GeoPackage tables are always non-streaming batch sources, so the streaming + // metadata check is unnecessary. + override lazy val fileIndex: PartitioningAwareFileIndex = + SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema) + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { if (loadOptions.showMetadata) { return MetadataSchema.schema diff --git a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala index 7db6bb8d1f..40ad2606eb 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.TableCapability import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper} import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema, fieldDescriptorsToSchema, mergeSchemas} import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.StructType @@ -43,6 +43,13 @@ case class ShapefileTable( fallbackFileFormat: Class[_ <: FileFormat]) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + // Override fileIndex to skip the FileStreamSink.hasMetadata check that causes + // spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3). + // Shapefile tables are always non-streaming batch sources, so the streaming + // metadata check is unnecessary. + override lazy val fileIndex: PartitioningAwareFileIndex = + SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema) + override def formatName: String = "Shapefile" override def capabilities: java.util.Set[TableCapability] = diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala index 845764fae5..abb18a9ddd 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.TableCapability import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper} import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -36,6 +36,14 @@ case class GeoParquetMetadataTable( userSpecifiedSchema: Option[StructType], fallbackFileFormat: Class[_ <: FileFormat]) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + + // Override fileIndex to skip the FileStreamSink.hasMetadata check that causes + // spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3). + // GeoParquet metadata tables are always non-streaming batch sources, so the streaming + // metadata check is unnecessary. + override lazy val fileIndex: PartitioningAwareFileIndex = + SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema) + override def formatName: String = "GeoParquet Metadata" override def inferSchema(files: Seq[FileStatus]): Option[StructType] = diff --git a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala index 85dec8427e..66087d1ce9 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala @@ -24,7 +24,7 @@ import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Me import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper} import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -42,6 +42,13 @@ case class GeoPackageTable( loadOptions: GeoPackageOptions) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + // Override fileIndex to skip the FileStreamSink.hasMetadata check that causes + // spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3). + // GeoPackage tables are always non-streaming batch sources, so the streaming + // metadata check is unnecessary. + override lazy val fileIndex: PartitioningAwareFileIndex = + SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema) + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { if (loadOptions.showMetadata) { return MetadataSchema.schema diff --git a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala index 7db6bb8d1f..40ad2606eb 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.TableCapability import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper} import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema, fieldDescriptorsToSchema, mergeSchemas} import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.StructType @@ -43,6 +43,13 @@ case class ShapefileTable( fallbackFileFormat: Class[_ <: FileFormat]) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + // Override fileIndex to skip the FileStreamSink.hasMetadata check that causes + // spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3). + // Shapefile tables are always non-streaming batch sources, so the streaming + // metadata check is unnecessary. + override lazy val fileIndex: PartitioningAwareFileIndex = + SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema) + override def formatName: String = "Shapefile" override def capabilities: java.util.Set[TableCapability] = diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala index 845764fae5..abb18a9ddd 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.TableCapability import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper} import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -36,6 +36,14 @@ case class GeoParquetMetadataTable( userSpecifiedSchema: Option[StructType], fallbackFileFormat: Class[_ <: FileFormat]) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + + // Override fileIndex to skip the FileStreamSink.hasMetadata check that causes + // spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3). + // GeoParquet metadata tables are always non-streaming batch sources, so the streaming + // metadata check is unnecessary. + override lazy val fileIndex: PartitioningAwareFileIndex = + SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema) + override def formatName: String = "GeoParquet Metadata" override def inferSchema(files: Seq[FileStatus]): Option[StructType] = diff --git a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala index 85dec8427e..66087d1ce9 100644 --- a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala +++ b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala @@ -24,7 +24,7 @@ import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Me import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper} import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -42,6 +42,13 @@ case class GeoPackageTable( loadOptions: GeoPackageOptions) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + // Override fileIndex to skip the FileStreamSink.hasMetadata check that causes + // spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3). + // GeoPackage tables are always non-streaming batch sources, so the streaming + // metadata check is unnecessary. + override lazy val fileIndex: PartitioningAwareFileIndex = + SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema) + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { if (loadOptions.showMetadata) { return MetadataSchema.schema diff --git a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala index 7db6bb8d1f..40ad2606eb 100644 --- a/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala +++ b/spark/spark-4.0/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.TableCapability import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper} import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema, fieldDescriptorsToSchema, mergeSchemas} import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.StructType @@ -43,6 +43,13 @@ case class ShapefileTable( fallbackFileFormat: Class[_ <: FileFormat]) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + // Override fileIndex to skip the FileStreamSink.hasMetadata check that causes + // spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3). + // Shapefile tables are always non-streaming batch sources, so the streaming + // metadata check is unnecessary. + override lazy val fileIndex: PartitioningAwareFileIndex = + SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema) + override def formatName: String = "Shapefile" override def capabilities: java.util.Set[TableCapability] = diff --git a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala index 845764fae5..abb18a9ddd 100644 --- a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala +++ b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.TableCapability import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper} import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -36,6 +36,14 @@ case class GeoParquetMetadataTable( userSpecifiedSchema: Option[StructType], fallbackFileFormat: Class[_ <: FileFormat]) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + + // Override fileIndex to skip the FileStreamSink.hasMetadata check that causes + // spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3). + // GeoParquet metadata tables are always non-streaming batch sources, so the streaming + // metadata check is unnecessary. + override lazy val fileIndex: PartitioningAwareFileIndex = + SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema) + override def formatName: String = "GeoParquet Metadata" override def inferSchema(files: Seq[FileStatus]): Option[StructType] = diff --git a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala index 85dec8427e..66087d1ce9 100644 --- a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala +++ b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/geopackage/GeoPackageTable.scala @@ -24,7 +24,7 @@ import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, Me import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper} import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -42,6 +42,13 @@ case class GeoPackageTable( loadOptions: GeoPackageOptions) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + // Override fileIndex to skip the FileStreamSink.hasMetadata check that causes + // spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3). + // GeoPackage tables are always non-streaming batch sources, so the streaming + // metadata check is unnecessary. + override lazy val fileIndex: PartitioningAwareFileIndex = + SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema) + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { if (loadOptions.showMetadata) { return MetadataSchema.schema diff --git a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala index 7db6bb8d1f..40ad2606eb 100644 --- a/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala +++ b/spark/spark-4.1/src/main/scala/org/apache/sedona/sql/datasources/shapefile/ShapefileTable.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.TableCapability import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper} import org.apache.sedona.sql.datasources.shapefile.ShapefileUtils.{baseSchema, fieldDescriptorsToSchema, mergeSchemas} import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.StructType @@ -43,6 +43,13 @@ case class ShapefileTable( fallbackFileFormat: Class[_ <: FileFormat]) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + // Override fileIndex to skip the FileStreamSink.hasMetadata check that causes + // spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3). + // Shapefile tables are always non-streaming batch sources, so the streaming + // metadata check is unnecessary. + override lazy val fileIndex: PartitioningAwareFileIndex = + SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema) + override def formatName: String = "Shapefile" override def capabilities: java.util.Set[TableCapability] = diff --git a/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala b/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala index 845764fae5..abb18a9ddd 100644 --- a/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala +++ b/spark/spark-4.1/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.TableCapability import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.{FileFormat, PartitioningAwareFileIndex, SedonaFileIndexHelper} import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -36,6 +36,14 @@ case class GeoParquetMetadataTable( userSpecifiedSchema: Option[StructType], fallbackFileFormat: Class[_ <: FileFormat]) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + + // Override fileIndex to skip the FileStreamSink.hasMetadata check that causes + // spurious FileNotFoundException warnings when reading from cloud storage (e.g., S3). + // GeoParquet metadata tables are always non-streaming batch sources, so the streaming + // metadata check is unnecessary. + override lazy val fileIndex: PartitioningAwareFileIndex = + SedonaFileIndexHelper.createFileIndex(sparkSession, options, paths, userSpecifiedSchema) + override def formatName: String = "GeoParquet Metadata" override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
