This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch fix-issue-with-reading-geoparquet-metadata
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 74395058a37d09465952c125cdf1f4910cfef529
Author: pawelkocinski <[email protected]>
AuthorDate: Tue Jan 7 23:13:24 2025 +0100

    Fix issue with not closing parquet files.
---
 .../GeoParquetMetadataPartitionReaderFactory.scala | 21 +++++++++++++++-----
 .../GeoParquetMetadataPartitionReaderFactory.scala | 22 ++++++++++++++++-----
 .../GeoParquetMetadataPartitionReaderFactory.scala | 23 +++++++++++++++++-----
 3 files changed, 51 insertions(+), 15 deletions(-)

diff --git 
a/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
 
b/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index 1fe2faa2e0..16227c90f6 100644
--- 
a/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++ 
b/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -66,12 +66,23 @@ object GeoParquetMetadataPartitionReaderFactory {
       configuration: Configuration,
       partitionedFile: PartitionedFile,
       readDataSchema: StructType): Iterator[InternalRow] = {
+    val reader = ParquetFileReader
+      .open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration))
+
+    try {
+      readFile(configuration, partitionedFile, readDataSchema, reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  private def readFile(
+      configuration: Configuration,
+      partitionedFile: PartitionedFile,
+      readDataSchema: StructType,
+      reader: ParquetFileReader): Iterator[InternalRow] = {
     val filePath = partitionedFile.filePath
-    val metadata = ParquetFileReader
-      .open(HadoopInputFile.fromPath(new Path(filePath), configuration))
-      .getFooter
-      .getFileMetaData
-      .getKeyValueMetaData
+    val metadata = reader.getFooter.getFileMetaData.getKeyValueMetaData
     val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
       case Some(geo) =>
         val geoColumnsMap = geo.columns.map { case (columnName, 
columnMetadata) =>
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index 2a5e70624c..6d548deb43 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -62,16 +62,28 @@ case class GeoParquetMetadataPartitionReaderFactory(
 }
 
 object GeoParquetMetadataPartitionReaderFactory {
+
   private def readFile(
       configuration: Configuration,
       partitionedFile: PartitionedFile,
       readDataSchema: StructType): Iterator[InternalRow] = {
-    val filePath = partitionedFile.toPath.toString
-    val metadata = ParquetFileReader
+    val reader = ParquetFileReader
       .open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration))
-      .getFooter
-      .getFileMetaData
-      .getKeyValueMetaData
+
+    try {
+      readFile(configuration, partitionedFile, readDataSchema, reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  private def readFile(
+      configuration: Configuration,
+      partitionedFile: PartitionedFile,
+      readDataSchema: StructType,
+      reader: ParquetFileReader): Iterator[InternalRow] = {
+    val filePath = partitionedFile.toPath.toString
+    val metadata = reader.getFooter.getFileMetaData.getKeyValueMetaData
     val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
       case Some(geo) =>
         val geoColumnsMap = geo.columns.map { case (columnName, 
columnMetadata) =>
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index 2a5e70624c..f7e2f56f7e 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -62,16 +62,29 @@ case class GeoParquetMetadataPartitionReaderFactory(
 }
 
 object GeoParquetMetadataPartitionReaderFactory {
+
   private def readFile(
       configuration: Configuration,
       partitionedFile: PartitionedFile,
       readDataSchema: StructType): Iterator[InternalRow] = {
-    val filePath = partitionedFile.toPath.toString
-    val metadata = ParquetFileReader
+    val reader = ParquetFileReader
       .open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration))
-      .getFooter
-      .getFileMetaData
-      .getKeyValueMetaData
+
+    try {
+      readFile(configuration, partitionedFile, readDataSchema, reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  private def readFile(
+      configuration: Configuration,
+      partitionedFile: PartitionedFile,
+      readDataSchema: StructType,
+      reader: ParquetFileReader): Iterator[InternalRow] = {
+    val filePath = partitionedFile.toPath.toString
+
+    val metadata = reader.getFooter.getFileMetaData.getKeyValueMetaData
     val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
       case Some(geo) =>
         val geoColumnsMap = geo.columns.map { case (columnName, 
columnMetadata) =>

Reply via email to