This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit b2762b0774bb8fdd7d99e33dfc764cb4edf23645
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Apr 28 13:29:59 2025 +0800

    [spark] Close ioManager in PaimonPartitionReader (#5542)
---
 .../java/org/apache/paimon/disk/IOManagerImpl.java | 18 +++++++++---
 .../paimon/spark/PaimonPartitionReader.scala       | 34 ++++++++++++++--------
 .../spark/PaimonPartitionReaderFactory.scala       | 24 ++-------------
 3 files changed, 38 insertions(+), 38 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
index d39c8efb54..53f5090b37 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
@@ -35,7 +35,7 @@ import java.util.stream.Collectors;
 /** The facade for the provided I/O manager services. */
 public class IOManagerImpl implements IOManager {
 
-    protected static final Logger LOG = 
LoggerFactory.getLogger(IOManager.class);
+    protected static final Logger LOG = 
LoggerFactory.getLogger(IOManagerImpl.class);
 
     private static final String DIR_NAME_PREFIX = "io";
 
@@ -60,9 +60,7 @@ public class IOManagerImpl implements IOManager {
             LOG.info(
                     "Created a new {} for spilling of task related data to 
disk (joins, sorting, ...). Used directories:\n\t{}",
                     FileChannelManager.class.getSimpleName(),
-                    Arrays.stream(fileChannelManager.getPaths())
-                            .map(File::getAbsolutePath)
-                            .collect(Collectors.joining("\n\t")));
+                    getSpillingDirectoriesPathsString());
         }
     }
 
@@ -70,6 +68,12 @@ public class IOManagerImpl implements IOManager {
     @Override
     public void close() throws Exception {
         fileChannelManager.close();
+        if (LOG.isInfoEnabled()) {
+            LOG.info(
+                    "Closed {} with directories:\n\t{}",
+                    FileChannelManager.class.getSimpleName(),
+                    getSpillingDirectoriesPathsString());
+        }
     }
 
     @Override
@@ -129,6 +133,12 @@ public class IOManagerImpl implements IOManager {
         return strings;
     }
 
+    private String getSpillingDirectoriesPathsString() {
+        return Arrays.stream(fileChannelManager.getPaths())
+                .map(File::getAbsolutePath)
+                .collect(Collectors.joining("\n\t"));
+    }
+
     @Override
     public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) 
throws IOException {
         return new BufferFileWriterImpl(channelID);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
index 526178e28e..802fbfdfa6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
@@ -19,30 +19,42 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.data.{InternalRow => PaimonInternalRow}
-import org.apache.paimon.reader.RecordReader
+import org.apache.paimon.disk.IOManager
+import org.apache.paimon.spark.SparkUtils.createIOManager
 import org.apache.paimon.spark.data.SparkInternalRow
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
-import org.apache.paimon.table.source.{DataSplit, Split}
+import org.apache.paimon.table.source.{DataSplit, ReadBuilder, Split}
+import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.metric.CustomTaskMetric
 import org.apache.spark.sql.connector.read.PartitionReader
 
-import java.io.IOException
+import javax.annotation.Nullable
+
+import java.util.{ArrayList => JList}
 
 import scala.collection.JavaConverters._
 
 case class PaimonPartitionReader(
-    readFunc: Split => RecordReader[PaimonInternalRow],
+    readBuilder: ReadBuilder,
     partition: PaimonInputPartition,
-    row: SparkInternalRow,
     metadataColumns: Seq[PaimonMetadataColumn]
 ) extends PartitionReader[InternalRow] {
 
   private val splits: Iterator[Split] = partition.splits.toIterator
-  private var currentRecordReader: PaimonRecordReaderIterator = readSplit()
+  @Nullable private var currentRecordReader = readSplit()
   private var advanced = false
   private var currentRow: PaimonInternalRow = _
+  private val ioManager: IOManager = createIOManager()
+  private val sparkRow: SparkInternalRow = {
+    val dataFields = new JList(readBuilder.readType().getFields)
+    dataFields.addAll(metadataColumns.map(_.toPaimonDataField).asJava)
+    val rowType = new RowType(dataFields)
+    SparkInternalRow.create(rowType)
+  }
+
+  private lazy val read = readBuilder.newRead().withIOManager(ioManager)
 
   override def next(): Boolean = {
     if (currentRecordReader == null) {
@@ -58,7 +70,7 @@ case class PaimonPartitionReader(
       null
     } else {
       advanced = false
-      row.replace(currentRow)
+      sparkRow.replace(currentRow)
     }
   }
 
@@ -89,8 +101,7 @@ case class PaimonPartitionReader(
   private def readSplit(): PaimonRecordReaderIterator = {
     if (splits.hasNext) {
       val split = splits.next();
-      val reader = readFunc(split)
-      PaimonRecordReaderIterator(reader, metadataColumns, split)
+      PaimonRecordReaderIterator(read.createReader(split), metadataColumns, 
split)
     } else {
       null
     }
@@ -117,9 +128,8 @@ case class PaimonPartitionReader(
       if (currentRecordReader != null) {
         currentRecordReader.close()
       }
-    } catch {
-      case e: Exception =>
-        throw new IOException(e)
+    } finally {
+      ioManager.close()
     }
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala
index 59b07a7944..d5ba5485ff 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala
@@ -18,43 +18,23 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.data.{InternalRow => PaimonInternalRow}
-import org.apache.paimon.disk.IOManager
-import org.apache.paimon.reader.RecordReader
-import org.apache.paimon.spark.SparkUtils.createIOManager
-import org.apache.paimon.spark.data.SparkInternalRow
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
-import org.apache.paimon.table.source.{ReadBuilder, Split}
-import org.apache.paimon.types.RowType
+import org.apache.paimon.table.source.ReadBuilder
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
 
-import java.util.{ArrayList => JList}
 import java.util.Objects
 
-import scala.collection.JavaConverters._
-
 case class PaimonPartitionReaderFactory(
     readBuilder: ReadBuilder,
     metadataColumns: Seq[PaimonMetadataColumn] = Seq.empty)
   extends PartitionReaderFactory {
 
-  private lazy val ioManager: IOManager = createIOManager()
-
-  private lazy val row: SparkInternalRow = {
-    val dataFields = new JList(readBuilder.readType().getFields)
-    dataFields.addAll(metadataColumns.map(_.toPaimonDataField).asJava)
-    val rowType = new RowType(dataFields)
-    SparkInternalRow.create(rowType)
-  }
-
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
     partition match {
       case paimonInputPartition: PaimonInputPartition =>
-        val readFunc: Split => RecordReader[PaimonInternalRow] =
-          (split: Split) => 
readBuilder.newRead().withIOManager(ioManager).createReader(split)
-        PaimonPartitionReader(readFunc, paimonInputPartition, row, 
metadataColumns)
+        PaimonPartitionReader(readBuilder, paimonInputPartition, 
metadataColumns)
       case _ =>
         throw new RuntimeException(s"It's not a Paimon input partition, 
$partition")
     }

Reply via email to