This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch fix-issue-with-reading-geoparquet-metadata in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 74395058a37d09465952c125cdf1f4910cfef529 Author: pawelkocinski <[email protected]> AuthorDate: Tue Jan 7 23:13:24 2025 +0100 Fix issue with not closing parquet files. --- .../GeoParquetMetadataPartitionReaderFactory.scala | 21 +++++++++++++++----- .../GeoParquetMetadataPartitionReaderFactory.scala | 22 ++++++++++++++++----- .../GeoParquetMetadataPartitionReaderFactory.scala | 23 +++++++++++++++++----- 3 files changed, 51 insertions(+), 15 deletions(-) diff --git a/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala b/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala index 1fe2faa2e0..16227c90f6 100644 --- a/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala +++ b/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala @@ -66,12 +66,23 @@ object GeoParquetMetadataPartitionReaderFactory { configuration: Configuration, partitionedFile: PartitionedFile, readDataSchema: StructType): Iterator[InternalRow] = { + val reader = ParquetFileReader + .open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration)) + + try { + readFile(configuration, partitionedFile, readDataSchema, reader) + } finally { + reader.close() + } + } + + private def readFile( + configuration: Configuration, + partitionedFile: PartitionedFile, + readDataSchema: StructType, + reader: ParquetFileReader): Iterator[InternalRow] = { val filePath = partitionedFile.filePath - val metadata = ParquetFileReader - .open(HadoopInputFile.fromPath(new Path(filePath), configuration)) - .getFooter - .getFileMetaData - .getKeyValueMetaData + val metadata = reader.getFooter.getFileMetaData.getKeyValueMetaData val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match { case Some(geo) => val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) => diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala index 2a5e70624c..6d548deb43 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala @@ -62,16 +62,28 @@ case class GeoParquetMetadataPartitionReaderFactory( } object GeoParquetMetadataPartitionReaderFactory { + private def readFile( configuration: Configuration, partitionedFile: PartitionedFile, readDataSchema: StructType): Iterator[InternalRow] = { - val filePath = partitionedFile.toPath.toString - val metadata = ParquetFileReader + val reader = ParquetFileReader .open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration)) - .getFooter - .getFileMetaData - .getKeyValueMetaData + + try { + readFile(configuration, partitionedFile, readDataSchema, reader) + } finally { + reader.close() + } + } + + private def readFile( + configuration: Configuration, + partitionedFile: PartitionedFile, + readDataSchema: StructType, + reader: ParquetFileReader): Iterator[InternalRow] = { + val filePath = partitionedFile.toPath.toString + val metadata = reader.getFooter.getFileMetaData.getKeyValueMetaData val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match { case Some(geo) => val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) => diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala index 2a5e70624c..f7e2f56f7e 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala @@ -62,16 +62,29 @@ case class GeoParquetMetadataPartitionReaderFactory( } object GeoParquetMetadataPartitionReaderFactory { + private def readFile( configuration: Configuration, partitionedFile: PartitionedFile, readDataSchema: StructType): Iterator[InternalRow] = { - val filePath = partitionedFile.toPath.toString - val metadata = ParquetFileReader + val reader = ParquetFileReader .open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration)) - .getFooter - .getFileMetaData - .getKeyValueMetaData + + try { + readFile(configuration, partitionedFile, readDataSchema, reader) + } finally { + reader.close() + } + } + + private def readFile( + configuration: Configuration, + partitionedFile: PartitionedFile, + readDataSchema: StructType, + reader: ParquetFileReader): Iterator[InternalRow] = { + val filePath = partitionedFile.toPath.toString + + val metadata = reader.getFooter.getFileMetaData.getKeyValueMetaData val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match { case Some(geo) => val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) =>
