This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.1 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit b2762b0774bb8fdd7d99e33dfc764cb4edf23645 Author: Zouxxyy <[email protected]> AuthorDate: Mon Apr 28 13:29:59 2025 +0800 [spark] Close ioManager in PaimonPartitionReader (#5542) --- .../java/org/apache/paimon/disk/IOManagerImpl.java | 18 +++++++++--- .../paimon/spark/PaimonPartitionReader.scala | 34 ++++++++++++++-------- .../spark/PaimonPartitionReaderFactory.scala | 24 ++------------- 3 files changed, 38 insertions(+), 38 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java index d39c8efb54..53f5090b37 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java @@ -35,7 +35,7 @@ import java.util.stream.Collectors; /** The facade for the provided I/O manager services. */ public class IOManagerImpl implements IOManager { - protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class); + protected static final Logger LOG = LoggerFactory.getLogger(IOManagerImpl.class); private static final String DIR_NAME_PREFIX = "io"; @@ -60,9 +60,7 @@ public class IOManagerImpl implements IOManager { LOG.info( "Created a new {} for spilling of task related data to disk (joins, sorting, ...). Used directories:\n\t{}", FileChannelManager.class.getSimpleName(), - Arrays.stream(fileChannelManager.getPaths()) - .map(File::getAbsolutePath) - .collect(Collectors.joining("\n\t"))); + getSpillingDirectoriesPathsString()); } } @@ -70,6 +68,12 @@ public class IOManagerImpl implements IOManager { @Override public void close() throws Exception { fileChannelManager.close(); + if (LOG.isInfoEnabled()) { + LOG.info( + "Closed {} with directories:\n\t{}", + FileChannelManager.class.getSimpleName(), + getSpillingDirectoriesPathsString()); + } } @Override @@ -129,6 +133,12 @@ public class IOManagerImpl implements IOManager { return strings; } + private String getSpillingDirectoriesPathsString() { + return Arrays.stream(fileChannelManager.getPaths()) + .map(File::getAbsolutePath) + .collect(Collectors.joining("\n\t")); + } + @Override public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException { return new BufferFileWriterImpl(channelID); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala index 526178e28e..802fbfdfa6 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala @@ -19,30 +19,42 @@ package org.apache.paimon.spark import org.apache.paimon.data.{InternalRow => PaimonInternalRow} -import org.apache.paimon.reader.RecordReader +import org.apache.paimon.disk.IOManager +import org.apache.paimon.spark.SparkUtils.createIOManager import org.apache.paimon.spark.data.SparkInternalRow import org.apache.paimon.spark.schema.PaimonMetadataColumn -import org.apache.paimon.table.source.{DataSplit, Split} +import org.apache.paimon.table.source.{DataSplit, ReadBuilder, Split} +import org.apache.paimon.types.RowType import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.metric.CustomTaskMetric import org.apache.spark.sql.connector.read.PartitionReader -import java.io.IOException +import javax.annotation.Nullable + +import java.util.{ArrayList => JList} import scala.collection.JavaConverters._ case class PaimonPartitionReader( - readFunc: Split => RecordReader[PaimonInternalRow], + readBuilder: ReadBuilder, partition: PaimonInputPartition, - row: SparkInternalRow, metadataColumns: Seq[PaimonMetadataColumn] ) extends PartitionReader[InternalRow] { private val splits: Iterator[Split] = partition.splits.toIterator - private var currentRecordReader: PaimonRecordReaderIterator = readSplit() + @Nullable private var currentRecordReader = readSplit() private var advanced = false private var currentRow: PaimonInternalRow = _ + private val ioManager: IOManager = createIOManager() + private val sparkRow: SparkInternalRow = { + val dataFields = new JList(readBuilder.readType().getFields) + dataFields.addAll(metadataColumns.map(_.toPaimonDataField).asJava) + val rowType = new RowType(dataFields) + SparkInternalRow.create(rowType) + } + + private lazy val read = readBuilder.newRead().withIOManager(ioManager) override def next(): Boolean = { if (currentRecordReader == null) { @@ -58,7 +70,7 @@ case class PaimonPartitionReader( null } else { advanced = false - row.replace(currentRow) + sparkRow.replace(currentRow) } } @@ -89,8 +101,7 @@ case class PaimonPartitionReader( private def readSplit(): PaimonRecordReaderIterator = { if (splits.hasNext) { val split = splits.next(); - val reader = readFunc(split) - PaimonRecordReaderIterator(reader, metadataColumns, split) + PaimonRecordReaderIterator(read.createReader(split), metadataColumns, split) } else { null } @@ -117,9 +128,8 @@ case class PaimonPartitionReader( if (currentRecordReader != null) { currentRecordReader.close() } - } catch { - case e: Exception => - throw new IOException(e) + } finally { + ioManager.close() } } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala index 59b07a7944..d5ba5485ff 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala @@ -18,43 +18,23 @@ package org.apache.paimon.spark -import org.apache.paimon.data.{InternalRow => PaimonInternalRow} -import org.apache.paimon.disk.IOManager -import org.apache.paimon.reader.RecordReader -import org.apache.paimon.spark.SparkUtils.createIOManager -import org.apache.paimon.spark.data.SparkInternalRow import org.apache.paimon.spark.schema.PaimonMetadataColumn -import org.apache.paimon.table.source.{ReadBuilder, Split} -import org.apache.paimon.types.RowType +import org.apache.paimon.table.source.ReadBuilder import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} -import java.util.{ArrayList => JList} import java.util.Objects -import scala.collection.JavaConverters._ - case class PaimonPartitionReaderFactory( readBuilder: ReadBuilder, metadataColumns: Seq[PaimonMetadataColumn] = Seq.empty) extends PartitionReaderFactory { - private lazy val ioManager: IOManager = createIOManager() - - private lazy val row: SparkInternalRow = { - val dataFields = new JList(readBuilder.readType().getFields) - dataFields.addAll(metadataColumns.map(_.toPaimonDataField).asJava) - val rowType = new RowType(dataFields) - SparkInternalRow.create(rowType) - } - override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { partition match { case paimonInputPartition: PaimonInputPartition => - val readFunc: Split => RecordReader[PaimonInternalRow] = - (split: Split) => readBuilder.newRead().withIOManager(ioManager).createReader(split) - PaimonPartitionReader(readFunc, paimonInputPartition, row, metadataColumns) + PaimonPartitionReader(readBuilder, paimonInputPartition, metadataColumns) case _ => throw new RuntimeException(s"It's not a Paimon input partition, $partition") }
