This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9aa9026a8 [core] Close IOManager to clear directories (#2174)
9aa9026a8 is described below
commit 9aa9026a842a702a93070b85e49270a746293656
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Oct 24 20:47:24 2023 +0800
[core] Close IOManager to clear directories (#2174)
---
.../org/apache/paimon/utils/AsyncRecordReader.java | 8 ++++----
.../paimon/flink/sink/StoreSinkWriteImpl.java | 8 +++++---
.../sink/index/GlobalIndexAssignerOperator.java | 9 ++++++---
.../apache/paimon/flink/sorter/SortOperator.java | 23 +++++++++++++++-------
.../paimon/flink/source/FileStoreSourceReader.java | 15 ++++++++++++++
.../apache/paimon/flink/source/FlinkSource.java | 8 ++------
.../align/AlignedContinuousFileStoreSource.java | 6 +++---
.../flink/source/align/AlignedSourceReader.java | 4 +++-
.../source/operator/MultiTablesReadOperator.java | 14 ++++++++++---
.../paimon/flink/source/operator/ReadOperator.java | 15 +++++++++++---
.../flink/source/FileStoreSourceReaderTest.java | 1 +
.../source/align/AlignedSourceReaderTest.java | 1 +
.../paimon/spark/SparkInputPartitionReader.java | 10 +++++++++-
.../apache/paimon/spark/SparkReaderFactory.java | 6 ++++--
.../spark/commands/WriteIntoPaimonTable.scala | 4 +++-
15 files changed, 95 insertions(+), 37 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/AsyncRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/utils/AsyncRecordReader.java
index 123b02453..c7563976b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/AsyncRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/AsyncRecordReader.java
@@ -39,17 +39,17 @@ public class AsyncRecordReader<T> implements
RecordReader<T> {
private final BlockingQueue<Element> queue;
private final Future<Void> future;
- private final ClassLoader classLoader;
private boolean isEnd = false;
public AsyncRecordReader(IOExceptionSupplier<RecordReader<T>> supplier) {
this.queue = new LinkedBlockingQueue<>();
- this.future = ASYNC_EXECUTOR.submit(() -> asyncRead(supplier));
- this.classLoader = Thread.currentThread().getContextClassLoader();
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ this.future = ASYNC_EXECUTOR.submit(() -> asyncRead(supplier,
classLoader));
}
- private Void asyncRead(IOExceptionSupplier<RecordReader<T>> supplier)
throws IOException {
+ private Void asyncRead(IOExceptionSupplier<RecordReader<T>> supplier,
ClassLoader classLoader)
+ throws IOException {
// set classloader, otherwise, its classloader belongs to its creator.
It is possible that
// its creator's classloader has already exited, which will cause
subsequent reads to report
// exceptions
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index d1baa5e85..8cf7a64e0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -52,7 +52,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
protected final String commitUser;
protected final StoreSinkWriteState state;
- private final IOManager ioManager;
+ private final IOManagerImpl paimonIOManager;
private final boolean ignorePreviousFiles;
private final boolean waitCompaction;
private final boolean isStreamingMode;
@@ -115,7 +115,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
@Nullable MemoryPoolFactory memoryPoolFactory) {
this.commitUser = commitUser;
this.state = state;
- this.ioManager = ioManager;
+ this.paimonIOManager = new
IOManagerImpl(ioManager.getSpillingDirectoriesPaths());
this.ignorePreviousFiles = ignorePreviousFiles;
this.waitCompaction = waitCompaction;
this.isStreamingMode = isStreamingMode;
@@ -134,7 +134,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
commitUser,
(part, bucket) ->
state.stateValueFilter().filter(table.name(), part, bucket))
- .withIOManager(new
IOManagerImpl(ioManager.getSpillingDirectoriesPaths()))
+ .withIOManager(paimonIOManager)
.withIgnorePreviousFiles(ignorePreviousFiles)
.isStreamingMode(isStreamingMode);
@@ -216,6 +216,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
if (write != null) {
write.close();
}
+
+ paimonIOManager.close();
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index c04eb13cf..19428155f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -40,7 +40,6 @@ import
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.io.File;
-import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
/** A {@link OneInputStreamOperator} for {@link GlobalIndexAssigner}. */
@@ -55,6 +54,7 @@ public class GlobalIndexAssignerOperator<T> extends
AbstractStreamOperator<Tuple
private final SerializableFunction<T, InternalRow> toRow;
private final SerializableFunction<InternalRow, T> fromRow;
+ private transient IOManager ioManager;
private transient RowBuffer bootstrapBuffer;
public GlobalIndexAssignerOperator(
@@ -75,7 +75,7 @@ public class GlobalIndexAssignerOperator<T> extends
AbstractStreamOperator<Tuple
getContainingTask().getEnvironment().getIOManager();
File[] tmpDirs = flinkIoManager.getSpillingDirectories();
File tmpDir =
tmpDirs[ThreadLocalRandom.current().nextInt(tmpDirs.length)];
- IOManager ioManager =
IOManager.create(flinkIoManager.getSpillingDirectoriesPaths());
+ ioManager =
IOManager.create(flinkIoManager.getSpillingDirectoriesPaths());
assigner.open(
ioManager,
tmpDir,
@@ -143,8 +143,11 @@ public class GlobalIndexAssignerOperator<T> extends
AbstractStreamOperator<Tuple
}
@Override
- public void close() throws IOException {
+ public void close() throws Exception {
this.assigner.close();
+ if (ioManager != null) {
+ ioManager.close();
+ }
}
public static GlobalIndexAssignerOperator<InternalRow> forRowData(Table
table) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
index 223f1e6e0..5d16c445a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
@@ -27,7 +27,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
-import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
@@ -50,7 +50,9 @@ public class SortOperator extends
TableStreamOperator<InternalRow>
private final int pageSize;
private final int arity;
private final int spillSortMaxNumFiles;
+
private transient BinaryExternalSortBuffer buffer;
+ private transient IOManager ioManager;
public SortOperator(
RowType keyType,
@@ -87,17 +89,19 @@ public class SortOperator extends
TableStreamOperator<InternalRow>
BinaryInMemorySortBuffer.createBuffer(
normalizedKeyComputer, serializer, keyComparator,
memoryPool);
+ this.ioManager =
+ IOManager.create(
+ getContainingTask()
+ .getEnvironment()
+ .getIOManager()
+ .getSpillingDirectoriesPaths());
buffer =
new BinaryExternalSortBuffer(
new BinaryRowSerializer(serializer.getArity()),
keyComparator,
memoryPool.pageSize(),
inMemorySortBuffer,
- new IOManagerImpl(
- getContainingTask()
- .getEnvironment()
- .getIOManager()
- .getSpillingDirectoriesPaths()),
+ ioManager,
spillSortMaxNumFiles);
}
@@ -115,7 +119,12 @@ public class SortOperator extends
TableStreamOperator<InternalRow>
@Override
public void close() throws Exception {
super.close();
- buffer.clear();
+ if (buffer != null) {
+ buffer.clear();
+ }
+ if (ioManager != null) {
+ ioManager.close();
+ }
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index 08713da4c..155fbd5da 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.source;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.table.source.TableRead;
@@ -38,9 +39,12 @@ public class FileStoreSourceReader
extends SingleThreadMultiplexSourceReaderBase<
RecordIterator<RowData>, RowData, FileStoreSourceSplit,
FileStoreSourceSplitState> {
+ @Nullable private final IOManager ioManager;
+
public FileStoreSourceReader(
SourceReaderContext readerContext,
TableRead tableRead,
+ @Nullable IOManager ioManager,
@Nullable Long limit,
@Nullable FileStoreSourceReaderMetrics sourceReaderMetrics) {
// limiter is created in SourceReader, it can be shared in all split
readers
@@ -51,11 +55,13 @@ public class FileStoreSourceReader
FlinkRecordsWithSplitIds::emitRecord,
readerContext.getConfiguration(),
readerContext);
+ this.ioManager = ioManager;
}
public FileStoreSourceReader(
SourceReaderContext readerContext,
TableRead tableRead,
+ @Nullable IOManager ioManager,
@Nullable Long limit,
FutureCompletingBlockingQueue<RecordsWithSplitIds<RecordIterator<RowData>>>
elementsQueue,
@@ -68,6 +74,7 @@ public class FileStoreSourceReader
FlinkRecordsWithSplitIds::emitRecord,
readerContext.getConfiguration(),
readerContext);
+ this.ioManager = ioManager;
}
@Override
@@ -98,4 +105,12 @@ public class FileStoreSourceReader
String splitId, FileStoreSourceSplitState splitState) {
return splitState.toSourceSplit();
}
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (ioManager != null) {
+ ioManager.close();
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
index a62c50bb1..d0706a6f9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.table.source.ReadBuilder;
@@ -53,14 +52,11 @@ public abstract class FlinkSource
@Override
public SourceReader<RowData, FileStoreSourceSplit>
createReader(SourceReaderContext context) {
IOManager ioManager =
- new
IOManagerImpl(splitPaths(context.getConfiguration().get(CoreOptions.TMP_DIRS)));
+
IOManager.create(splitPaths(context.getConfiguration().get(CoreOptions.TMP_DIRS)));
FileStoreSourceReaderMetrics sourceReaderMetrics =
new FileStoreSourceReaderMetrics(context.metricGroup());
return new FileStoreSourceReader(
- context,
- readBuilder.newRead().withIOManager(ioManager),
- limit,
- sourceReaderMetrics);
+ context, readBuilder.newRead(), ioManager, limit,
sourceReaderMetrics);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index c865eca4c..063daf1b3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.source.align;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.source.ContinuousFileStoreSource;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
@@ -60,7 +59,7 @@ public class AlignedContinuousFileStoreSource extends
ContinuousFileStoreSource
@Override
public SourceReader<RowData, FileStoreSourceSplit>
createReader(SourceReaderContext context) {
IOManager ioManager =
- new IOManagerImpl(
+ IOManager.create(
splitPaths(
context.getConfiguration()
.get(org.apache.flink.configuration.CoreOptions.TMP_DIRS)));
@@ -68,7 +67,8 @@ public class AlignedContinuousFileStoreSource extends
ContinuousFileStoreSource
new FileStoreSourceReaderMetrics(context.metricGroup());
return new AlignedSourceReader(
context,
- readBuilder.newRead().withIOManager(ioManager),
+ readBuilder.newRead(),
+ ioManager,
limit,
new FutureCompletingBlockingQueue<>(
context.getConfiguration()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
index 234a506ff..e0f909344 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.source.align;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.source.FileStoreSourceReader;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.FileStoreSourceSplitState;
@@ -52,11 +53,12 @@ public class AlignedSourceReader extends
FileStoreSourceReader
public AlignedSourceReader(
SourceReaderContext readerContext,
TableRead tableRead,
+ @Nullable IOManager ioManager,
@Nullable Long limit,
FutureCompletingBlockingQueue<RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>>
elementsQueue,
@Nullable FileStoreSourceReaderMetrics sourceReaderMetrics) {
- super(readerContext, tableRead, limit, elementsQueue,
sourceReaderMetrics);
+ super(readerContext, tableRead, ioManager, limit, elementsQueue,
sourceReaderMetrics);
this.elementsQueue = elementsQueue;
this.nextCheckpointId = null;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
index 37366a239..8a4232255 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.source.operator;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -62,7 +62,7 @@ public class MultiTablesReadOperator extends
AbstractStreamOperator<RowData>
}
private transient Catalog catalog;
- private transient IOManagerImpl ioManager;
+ private transient IOManager ioManager;
private transient Map<Identifier, BucketsTable> tablesMap;
private transient Map<Identifier, TableRead> readsMap;
private transient StreamRecord<RowData> reuseRecord;
@@ -72,7 +72,7 @@ public class MultiTablesReadOperator extends
AbstractStreamOperator<RowData>
public void open() throws Exception {
super.open();
ioManager =
- new IOManagerImpl(
+ IOManager.create(
getContainingTask()
.getEnvironment()
.getIOManager()
@@ -122,4 +122,12 @@ public class MultiTablesReadOperator extends
AbstractStreamOperator<RowData>
return readsMap.get(tableId);
}
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (ioManager != null) {
+ ioManager.close();
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index d6834ee70..0a1b3c511 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.source.operator;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
@@ -46,6 +46,7 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
private transient TableRead read;
private transient StreamRecord<RowData> reuseRecord;
private transient FlinkRowData reuseRow;
+ private transient IOManager ioManager;
public ReadOperator(ReadBuilder readBuilder) {
this.readBuilder = readBuilder;
@@ -54,8 +55,8 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
@Override
public void open() throws Exception {
super.open();
- IOManagerImpl ioManager =
- new IOManagerImpl(
+ this.ioManager =
+ IOManager.create(
getContainingTask()
.getEnvironment()
.getIOManager()
@@ -75,4 +76,12 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
}
}
}
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (ioManager != null) {
+ ioManager.close();
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
index f8f29c099..b92d30f32 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
@@ -109,6 +109,7 @@ public class FileStoreSourceReaderTest {
context,
new
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
null,
+ null,
null);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
index e12ef5c6d..0d38ae3cf 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
@@ -66,6 +66,7 @@ public class AlignedSourceReaderTest extends
FileStoreSourceReaderTest {
context,
new
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
null,
+ null,
new FutureCompletingBlockingQueue<>(2),
null);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartitionReader.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartitionReader.java
index ec6da01f4..0c299cabe 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartitionReader.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartitionReader.java
@@ -17,6 +17,7 @@
package org.apache.paimon.spark;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -28,13 +29,15 @@ import java.io.IOException;
/** A spark 3 {@link PartitionReader} for paimon, created by {@link
PartitionReaderFactory}. */
public class SparkInputPartitionReader implements PartitionReader<InternalRow>
{
+ private final IOManager ioManager;
private final RecordReaderIterator<org.apache.paimon.data.InternalRow>
iterator;
-
private final SparkInternalRow row;
public SparkInputPartitionReader(
+ IOManager ioManager,
RecordReaderIterator<org.apache.paimon.data.InternalRow> iterator,
SparkInternalRow row) {
+ this.ioManager = ioManager;
this.iterator = iterator;
this.row = row;
}
@@ -60,5 +63,10 @@ public class SparkInputPartitionReader implements
PartitionReader<InternalRow> {
} catch (Exception e) {
throw new IOException(e);
}
+ try {
+ ioManager.close();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
index 64c74c5f8..279aa5853 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
@@ -18,6 +18,7 @@
package org.apache.paimon.spark;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.source.ReadBuilder;
@@ -46,17 +47,18 @@ public class SparkReaderFactory implements
PartitionReaderFactory {
public PartitionReader<org.apache.spark.sql.catalyst.InternalRow>
createReader(
InputPartition partition) {
RecordReader<InternalRow> reader;
+ IOManager ioManager = createIOManager();
try {
reader =
readBuilder
.newRead()
- .withIOManager(createIOManager())
+ .withIOManager(ioManager)
.createReader(((SparkInputPartition)
partition).split());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
RecordReaderIterator<InternalRow> iterator = new
RecordReaderIterator<>(reader);
SparkInternalRow row = new SparkInternalRow(readBuilder.readType());
- return new SparkInputPartitionReader(iterator, row);
+ return new SparkInputPartitionReader(ioManager, iterator, row);
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 1770dc965..b04b6f021 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -122,8 +122,9 @@ case class WriteIntoPaimonTable(
val commitMessages = df
.mapPartitions {
iter =>
+ val ioManager = createIOManager
val write = writeBuilder.newWrite()
- write.withIOManager(createIOManager)
+ write.withIOManager(ioManager)
try {
iter.foreach {
row =>
@@ -135,6 +136,7 @@ case class WriteIntoPaimonTable(
write.prepareCommit().asScala.map(serializer.serialize).toIterator
} finally {
write.close()
+ ioManager.close()
}
}
.collect()