This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 43b898a617 [spark] Close ioManager in PaimonPartitionReader (#5542)
43b898a617 is described below
commit 43b898a617adf3593c024f38db4e1bc43c61844b
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Apr 28 13:29:59 2025 +0800
[spark] Close ioManager in PaimonPartitionReader (#5542)
---
.../java/org/apache/paimon/disk/IOManagerImpl.java | 18 +++++++++---
.../paimon/spark/PaimonPartitionReader.scala | 34 ++++++++++++++--------
.../spark/PaimonPartitionReaderFactory.scala | 24 ++-------------
3 files changed, 38 insertions(+), 38 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
index d39c8efb54..53f5090b37 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
@@ -35,7 +35,7 @@ import java.util.stream.Collectors;
/** The facade for the provided I/O manager services. */
public class IOManagerImpl implements IOManager {
- protected static final Logger LOG =
LoggerFactory.getLogger(IOManager.class);
+ protected static final Logger LOG =
LoggerFactory.getLogger(IOManagerImpl.class);
private static final String DIR_NAME_PREFIX = "io";
@@ -60,9 +60,7 @@ public class IOManagerImpl implements IOManager {
LOG.info(
"Created a new {} for spilling of task related data to
disk (joins, sorting, ...). Used directories:\n\t{}",
FileChannelManager.class.getSimpleName(),
- Arrays.stream(fileChannelManager.getPaths())
- .map(File::getAbsolutePath)
- .collect(Collectors.joining("\n\t")));
+ getSpillingDirectoriesPathsString());
}
}
@@ -70,6 +68,12 @@ public class IOManagerImpl implements IOManager {
@Override
public void close() throws Exception {
fileChannelManager.close();
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Closed {} with directories:\n\t{}",
+ FileChannelManager.class.getSimpleName(),
+ getSpillingDirectoriesPathsString());
+ }
}
@Override
@@ -129,6 +133,12 @@ public class IOManagerImpl implements IOManager {
return strings;
}
+ private String getSpillingDirectoriesPathsString() {
+ return Arrays.stream(fileChannelManager.getPaths())
+ .map(File::getAbsolutePath)
+ .collect(Collectors.joining("\n\t"));
+ }
+
@Override
public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID)
throws IOException {
return new BufferFileWriterImpl(channelID);
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
index 526178e28e..802fbfdfa6 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
@@ -19,30 +19,42 @@
package org.apache.paimon.spark
import org.apache.paimon.data.{InternalRow => PaimonInternalRow}
-import org.apache.paimon.reader.RecordReader
+import org.apache.paimon.disk.IOManager
+import org.apache.paimon.spark.SparkUtils.createIOManager
import org.apache.paimon.spark.data.SparkInternalRow
import org.apache.paimon.spark.schema.PaimonMetadataColumn
-import org.apache.paimon.table.source.{DataSplit, Split}
+import org.apache.paimon.table.source.{DataSplit, ReadBuilder, Split}
+import org.apache.paimon.types.RowType
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.metric.CustomTaskMetric
import org.apache.spark.sql.connector.read.PartitionReader
-import java.io.IOException
+import javax.annotation.Nullable
+
+import java.util.{ArrayList => JList}
import scala.collection.JavaConverters._
case class PaimonPartitionReader(
- readFunc: Split => RecordReader[PaimonInternalRow],
+ readBuilder: ReadBuilder,
partition: PaimonInputPartition,
- row: SparkInternalRow,
metadataColumns: Seq[PaimonMetadataColumn]
) extends PartitionReader[InternalRow] {
private val splits: Iterator[Split] = partition.splits.toIterator
- private var currentRecordReader: PaimonRecordReaderIterator = readSplit()
+ @Nullable private var currentRecordReader = readSplit()
private var advanced = false
private var currentRow: PaimonInternalRow = _
+ private val ioManager: IOManager = createIOManager()
+ private val sparkRow: SparkInternalRow = {
+ val dataFields = new JList(readBuilder.readType().getFields)
+ dataFields.addAll(metadataColumns.map(_.toPaimonDataField).asJava)
+ val rowType = new RowType(dataFields)
+ SparkInternalRow.create(rowType)
+ }
+
+ private lazy val read = readBuilder.newRead().withIOManager(ioManager)
override def next(): Boolean = {
if (currentRecordReader == null) {
@@ -58,7 +70,7 @@ case class PaimonPartitionReader(
null
} else {
advanced = false
- row.replace(currentRow)
+ sparkRow.replace(currentRow)
}
}
@@ -89,8 +101,7 @@ case class PaimonPartitionReader(
private def readSplit(): PaimonRecordReaderIterator = {
if (splits.hasNext) {
val split = splits.next();
- val reader = readFunc(split)
- PaimonRecordReaderIterator(reader, metadataColumns, split)
+ PaimonRecordReaderIterator(read.createReader(split), metadataColumns,
split)
} else {
null
}
@@ -117,9 +128,8 @@ case class PaimonPartitionReader(
if (currentRecordReader != null) {
currentRecordReader.close()
}
- } catch {
- case e: Exception =>
- throw new IOException(e)
+ } finally {
+ ioManager.close()
}
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala
index 59b07a7944..d5ba5485ff 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala
@@ -18,43 +18,23 @@
package org.apache.paimon.spark
-import org.apache.paimon.data.{InternalRow => PaimonInternalRow}
-import org.apache.paimon.disk.IOManager
-import org.apache.paimon.reader.RecordReader
-import org.apache.paimon.spark.SparkUtils.createIOManager
-import org.apache.paimon.spark.data.SparkInternalRow
import org.apache.paimon.spark.schema.PaimonMetadataColumn
-import org.apache.paimon.table.source.{ReadBuilder, Split}
-import org.apache.paimon.types.RowType
+import org.apache.paimon.table.source.ReadBuilder
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory}
-import java.util.{ArrayList => JList}
import java.util.Objects
-import scala.collection.JavaConverters._
-
case class PaimonPartitionReaderFactory(
readBuilder: ReadBuilder,
metadataColumns: Seq[PaimonMetadataColumn] = Seq.empty)
extends PartitionReaderFactory {
- private lazy val ioManager: IOManager = createIOManager()
-
- private lazy val row: SparkInternalRow = {
- val dataFields = new JList(readBuilder.readType().getFields)
- dataFields.addAll(metadataColumns.map(_.toPaimonDataField).asJava)
- val rowType = new RowType(dataFields)
- SparkInternalRow.create(rowType)
- }
-
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
partition match {
case paimonInputPartition: PaimonInputPartition =>
- val readFunc: Split => RecordReader[PaimonInternalRow] =
- (split: Split) =>
readBuilder.newRead().withIOManager(ioManager).createReader(split)
- PaimonPartitionReader(readFunc, paimonInputPartition, row,
metadataColumns)
+ PaimonPartitionReader(readBuilder, paimonInputPartition,
metadataColumns)
case _ =>
throw new RuntimeException(s"It's not a Paimon input partition,
$partition")
}