This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9aa9026a8 [core] Close IOManager to clear directories (#2174)
9aa9026a8 is described below

commit 9aa9026a842a702a93070b85e49270a746293656
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Oct 24 20:47:24 2023 +0800

    [core] Close IOManager to clear directories (#2174)
---
 .../org/apache/paimon/utils/AsyncRecordReader.java |  8 ++++----
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |  8 +++++---
 .../sink/index/GlobalIndexAssignerOperator.java    |  9 ++++++---
 .../apache/paimon/flink/sorter/SortOperator.java   | 23 +++++++++++++++-------
 .../paimon/flink/source/FileStoreSourceReader.java | 15 ++++++++++++++
 .../apache/paimon/flink/source/FlinkSource.java    |  8 ++------
 .../align/AlignedContinuousFileStoreSource.java    |  6 +++---
 .../flink/source/align/AlignedSourceReader.java    |  4 +++-
 .../source/operator/MultiTablesReadOperator.java   | 14 ++++++++++---
 .../paimon/flink/source/operator/ReadOperator.java | 15 +++++++++++---
 .../flink/source/FileStoreSourceReaderTest.java    |  1 +
 .../source/align/AlignedSourceReaderTest.java      |  1 +
 .../paimon/spark/SparkInputPartitionReader.java    | 10 +++++++++-
 .../apache/paimon/spark/SparkReaderFactory.java    |  6 ++++--
 .../spark/commands/WriteIntoPaimonTable.scala      |  4 +++-
 15 files changed, 95 insertions(+), 37 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/AsyncRecordReader.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/AsyncRecordReader.java
index 123b02453..c7563976b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/AsyncRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/AsyncRecordReader.java
@@ -39,17 +39,17 @@ public class AsyncRecordReader<T> implements 
RecordReader<T> {
 
     private final BlockingQueue<Element> queue;
     private final Future<Void> future;
-    private final ClassLoader classLoader;
 
     private boolean isEnd = false;
 
     public AsyncRecordReader(IOExceptionSupplier<RecordReader<T>> supplier) {
         this.queue = new LinkedBlockingQueue<>();
-        this.future = ASYNC_EXECUTOR.submit(() -> asyncRead(supplier));
-        this.classLoader = Thread.currentThread().getContextClassLoader();
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+        this.future = ASYNC_EXECUTOR.submit(() -> asyncRead(supplier, 
classLoader));
     }
 
-    private Void asyncRead(IOExceptionSupplier<RecordReader<T>> supplier) 
throws IOException {
+    private Void asyncRead(IOExceptionSupplier<RecordReader<T>> supplier, 
ClassLoader classLoader)
+            throws IOException {
         // set classloader, otherwise, its classloader belongs to its creator. 
It is possible that
         // its creator's classloader has already exited, which will cause 
subsequent reads to report
         // exceptions
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index d1baa5e85..8cf7a64e0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -52,7 +52,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
 
     protected final String commitUser;
     protected final StoreSinkWriteState state;
-    private final IOManager ioManager;
+    private final IOManagerImpl paimonIOManager;
     private final boolean ignorePreviousFiles;
     private final boolean waitCompaction;
     private final boolean isStreamingMode;
@@ -115,7 +115,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
             @Nullable MemoryPoolFactory memoryPoolFactory) {
         this.commitUser = commitUser;
         this.state = state;
-        this.ioManager = ioManager;
+        this.paimonIOManager = new 
IOManagerImpl(ioManager.getSpillingDirectoriesPaths());
         this.ignorePreviousFiles = ignorePreviousFiles;
         this.waitCompaction = waitCompaction;
         this.isStreamingMode = isStreamingMode;
@@ -134,7 +134,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
                                 commitUser,
                                 (part, bucket) ->
                                         
state.stateValueFilter().filter(table.name(), part, bucket))
-                        .withIOManager(new 
IOManagerImpl(ioManager.getSpillingDirectoriesPaths()))
+                        .withIOManager(paimonIOManager)
                         .withIgnorePreviousFiles(ignorePreviousFiles)
                         .isStreamingMode(isStreamingMode);
 
@@ -216,6 +216,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
         if (write != null) {
             write.close();
         }
+
+        paimonIOManager.close();
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index c04eb13cf..19428155f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -40,7 +40,6 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.concurrent.ThreadLocalRandom;
 
 /** A {@link OneInputStreamOperator} for {@link GlobalIndexAssigner}. */
@@ -55,6 +54,7 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
     private final SerializableFunction<T, InternalRow> toRow;
     private final SerializableFunction<InternalRow, T> fromRow;
 
+    private transient IOManager ioManager;
     private transient RowBuffer bootstrapBuffer;
 
     public GlobalIndexAssignerOperator(
@@ -75,7 +75,7 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
                 getContainingTask().getEnvironment().getIOManager();
         File[] tmpDirs = flinkIoManager.getSpillingDirectories();
         File tmpDir = 
tmpDirs[ThreadLocalRandom.current().nextInt(tmpDirs.length)];
-        IOManager ioManager = 
IOManager.create(flinkIoManager.getSpillingDirectoriesPaths());
+        ioManager = 
IOManager.create(flinkIoManager.getSpillingDirectoriesPaths());
         assigner.open(
                 ioManager,
                 tmpDir,
@@ -143,8 +143,11 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() throws Exception {
         this.assigner.close();
+        if (ioManager != null) {
+            ioManager.close();
+        }
     }
 
     public static GlobalIndexAssignerOperator<InternalRow> forRowData(Table 
table) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
index 223f1e6e0..5d16c445a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
@@ -27,7 +27,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.BinaryRowSerializer;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.data.serializer.InternalSerializers;
-import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.sort.BinaryExternalSortBuffer;
@@ -50,7 +50,9 @@ public class SortOperator extends 
TableStreamOperator<InternalRow>
     private final int pageSize;
     private final int arity;
     private final int spillSortMaxNumFiles;
+
     private transient BinaryExternalSortBuffer buffer;
+    private transient IOManager ioManager;
 
     public SortOperator(
             RowType keyType,
@@ -87,17 +89,19 @@ public class SortOperator extends 
TableStreamOperator<InternalRow>
                 BinaryInMemorySortBuffer.createBuffer(
                         normalizedKeyComputer, serializer, keyComparator, 
memoryPool);
 
+        this.ioManager =
+                IOManager.create(
+                        getContainingTask()
+                                .getEnvironment()
+                                .getIOManager()
+                                .getSpillingDirectoriesPaths());
         buffer =
                 new BinaryExternalSortBuffer(
                         new BinaryRowSerializer(serializer.getArity()),
                         keyComparator,
                         memoryPool.pageSize(),
                         inMemorySortBuffer,
-                        new IOManagerImpl(
-                                getContainingTask()
-                                        .getEnvironment()
-                                        .getIOManager()
-                                        .getSpillingDirectoriesPaths()),
+                        ioManager,
                         spillSortMaxNumFiles);
     }
 
@@ -115,7 +119,12 @@ public class SortOperator extends 
TableStreamOperator<InternalRow>
     @Override
     public void close() throws Exception {
         super.close();
-        buffer.clear();
+        if (buffer != null) {
+            buffer.clear();
+        }
+        if (ioManager != null) {
+            ioManager.close();
+        }
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index 08713da4c..155fbd5da 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.source;
 
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 import org.apache.paimon.table.source.TableRead;
 
@@ -38,9 +39,12 @@ public class FileStoreSourceReader
         extends SingleThreadMultiplexSourceReaderBase<
                 RecordIterator<RowData>, RowData, FileStoreSourceSplit, 
FileStoreSourceSplitState> {
 
+    @Nullable private final IOManager ioManager;
+
     public FileStoreSourceReader(
             SourceReaderContext readerContext,
             TableRead tableRead,
+            @Nullable IOManager ioManager,
             @Nullable Long limit,
             @Nullable FileStoreSourceReaderMetrics sourceReaderMetrics) {
         // limiter is created in SourceReader, it can be shared in all split 
readers
@@ -51,11 +55,13 @@ public class FileStoreSourceReader
                 FlinkRecordsWithSplitIds::emitRecord,
                 readerContext.getConfiguration(),
                 readerContext);
+        this.ioManager = ioManager;
     }
 
     public FileStoreSourceReader(
             SourceReaderContext readerContext,
             TableRead tableRead,
+            @Nullable IOManager ioManager,
             @Nullable Long limit,
             
FutureCompletingBlockingQueue<RecordsWithSplitIds<RecordIterator<RowData>>>
                     elementsQueue,
@@ -68,6 +74,7 @@ public class FileStoreSourceReader
                 FlinkRecordsWithSplitIds::emitRecord,
                 readerContext.getConfiguration(),
                 readerContext);
+        this.ioManager = ioManager;
     }
 
     @Override
@@ -98,4 +105,12 @@ public class FileStoreSourceReader
             String splitId, FileStoreSourceSplitState splitState) {
         return splitState.toSourceSplit();
     }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (ioManager != null) {
+            ioManager.close();
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
index a62c50bb1..d0706a6f9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 import org.apache.paimon.table.source.ReadBuilder;
 
@@ -53,14 +52,11 @@ public abstract class FlinkSource
     @Override
     public SourceReader<RowData, FileStoreSourceSplit> 
createReader(SourceReaderContext context) {
         IOManager ioManager =
-                new 
IOManagerImpl(splitPaths(context.getConfiguration().get(CoreOptions.TMP_DIRS)));
+                
IOManager.create(splitPaths(context.getConfiguration().get(CoreOptions.TMP_DIRS)));
         FileStoreSourceReaderMetrics sourceReaderMetrics =
                 new FileStoreSourceReaderMetrics(context.metricGroup());
         return new FileStoreSourceReader(
-                context,
-                readBuilder.newRead().withIOManager(ioManager),
-                limit,
-                sourceReaderMetrics);
+                context, readBuilder.newRead(), ioManager, limit, 
sourceReaderMetrics);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index c865eca4c..063daf1b3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.source.align;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.source.ContinuousFileStoreSource;
 import org.apache.paimon.flink.source.FileStoreSourceSplit;
@@ -60,7 +59,7 @@ public class AlignedContinuousFileStoreSource extends 
ContinuousFileStoreSource
     @Override
     public SourceReader<RowData, FileStoreSourceSplit> 
createReader(SourceReaderContext context) {
         IOManager ioManager =
-                new IOManagerImpl(
+                IOManager.create(
                         splitPaths(
                                 context.getConfiguration()
                                         
.get(org.apache.flink.configuration.CoreOptions.TMP_DIRS)));
@@ -68,7 +67,8 @@ public class AlignedContinuousFileStoreSource extends 
ContinuousFileStoreSource
                 new FileStoreSourceReaderMetrics(context.metricGroup());
         return new AlignedSourceReader(
                 context,
-                readBuilder.newRead().withIOManager(ioManager),
+                readBuilder.newRead(),
+                ioManager,
                 limit,
                 new FutureCompletingBlockingQueue<>(
                         context.getConfiguration()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
index 234a506ff..e0f909344 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.source.align;
 
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.flink.source.FileStoreSourceReader;
 import org.apache.paimon.flink.source.FileStoreSourceSplit;
 import org.apache.paimon.flink.source.FileStoreSourceSplitState;
@@ -52,11 +53,12 @@ public class AlignedSourceReader extends 
FileStoreSourceReader
     public AlignedSourceReader(
             SourceReaderContext readerContext,
             TableRead tableRead,
+            @Nullable IOManager ioManager,
             @Nullable Long limit,
             
FutureCompletingBlockingQueue<RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>>
                     elementsQueue,
             @Nullable FileStoreSourceReaderMetrics sourceReaderMetrics) {
-        super(readerContext, tableRead, limit, elementsQueue, 
sourceReaderMetrics);
+        super(readerContext, tableRead, ioManager, limit, elementsQueue, 
sourceReaderMetrics);
         this.elementsQueue = elementsQueue;
         this.nextCheckpointId = null;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
index 37366a239..8a4232255 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.source.operator;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -62,7 +62,7 @@ public class MultiTablesReadOperator extends 
AbstractStreamOperator<RowData>
     }
 
     private transient Catalog catalog;
-    private transient IOManagerImpl ioManager;
+    private transient IOManager ioManager;
     private transient Map<Identifier, BucketsTable> tablesMap;
     private transient Map<Identifier, TableRead> readsMap;
     private transient StreamRecord<RowData> reuseRecord;
@@ -72,7 +72,7 @@ public class MultiTablesReadOperator extends 
AbstractStreamOperator<RowData>
     public void open() throws Exception {
         super.open();
         ioManager =
-                new IOManagerImpl(
+                IOManager.create(
                         getContainingTask()
                                 .getEnvironment()
                                 .getIOManager()
@@ -122,4 +122,12 @@ public class MultiTablesReadOperator extends 
AbstractStreamOperator<RowData>
 
         return readsMap.get(tableId);
     }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (ioManager != null) {
+            ioManager.close();
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index d6834ee70..0a1b3c511 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.source.operator;
 
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
@@ -46,6 +46,7 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
     private transient TableRead read;
     private transient StreamRecord<RowData> reuseRecord;
     private transient FlinkRowData reuseRow;
+    private transient IOManager ioManager;
 
     public ReadOperator(ReadBuilder readBuilder) {
         this.readBuilder = readBuilder;
@@ -54,8 +55,8 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
     @Override
     public void open() throws Exception {
         super.open();
-        IOManagerImpl ioManager =
-                new IOManagerImpl(
+        this.ioManager =
+                IOManager.create(
                         getContainingTask()
                                 .getEnvironment()
                                 .getIOManager()
@@ -75,4 +76,12 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
             }
         }
     }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (ioManager != null) {
+            ioManager.close();
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
index f8f29c099..b92d30f32 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
@@ -109,6 +109,7 @@ public class FileStoreSourceReaderTest {
                 context,
                 new 
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
                 null,
+                null,
                 null);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
index e12ef5c6d..0d38ae3cf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
@@ -66,6 +66,7 @@ public class AlignedSourceReaderTest extends 
FileStoreSourceReaderTest {
                 context,
                 new 
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
                 null,
+                null,
                 new FutureCompletingBlockingQueue<>(2),
                 null);
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartitionReader.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartitionReader.java
index ec6da01f4..0c299cabe 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartitionReader.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartitionReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.paimon.spark;
 
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.reader.RecordReaderIterator;
 
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -28,13 +29,15 @@ import java.io.IOException;
 /** A spark 3 {@link PartitionReader} for paimon, created by {@link 
PartitionReaderFactory}. */
 public class SparkInputPartitionReader implements PartitionReader<InternalRow> 
{
 
+    private final IOManager ioManager;
     private final RecordReaderIterator<org.apache.paimon.data.InternalRow> 
iterator;
-
     private final SparkInternalRow row;
 
     public SparkInputPartitionReader(
+            IOManager ioManager,
             RecordReaderIterator<org.apache.paimon.data.InternalRow> iterator,
             SparkInternalRow row) {
+        this.ioManager = ioManager;
         this.iterator = iterator;
         this.row = row;
     }
@@ -60,5 +63,10 @@ public class SparkInputPartitionReader implements 
PartitionReader<InternalRow> {
         } catch (Exception e) {
             throw new IOException(e);
         }
+        try {
+            ioManager.close();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
index 64c74c5f8..279aa5853 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
@@ -18,6 +18,7 @@
 package org.apache.paimon.spark;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -46,17 +47,18 @@ public class SparkReaderFactory implements 
PartitionReaderFactory {
     public PartitionReader<org.apache.spark.sql.catalyst.InternalRow> 
createReader(
             InputPartition partition) {
         RecordReader<InternalRow> reader;
+        IOManager ioManager = createIOManager();
         try {
             reader =
                     readBuilder
                             .newRead()
-                            .withIOManager(createIOManager())
+                            .withIOManager(ioManager)
                             .createReader(((SparkInputPartition) 
partition).split());
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
         RecordReaderIterator<InternalRow> iterator = new 
RecordReaderIterator<>(reader);
         SparkInternalRow row = new SparkInternalRow(readBuilder.readType());
-        return new SparkInputPartitionReader(iterator, row);
+        return new SparkInputPartitionReader(ioManager, iterator, row);
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 1770dc965..b04b6f021 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -122,8 +122,9 @@ case class WriteIntoPaimonTable(
     val commitMessages = df
       .mapPartitions {
         iter =>
+          val ioManager = createIOManager
           val write = writeBuilder.newWrite()
-          write.withIOManager(createIOManager)
+          write.withIOManager(ioManager)
           try {
             iter.foreach {
               row =>
@@ -135,6 +136,7 @@ case class WriteIntoPaimonTable(
             write.prepareCommit().asScala.map(serializer.serialize).toIterator
           } finally {
             write.close()
+            ioManager.close()
           }
       }
       .collect()

Reply via email to