This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 60b8d1d0e [core] RollingFileWriter should not refer all writers (#1382)
60b8d1d0e is described below
commit 60b8d1d0e0fec70c5f725e9db9d38f446e1323b7
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jun 15 17:00:08 2023 +0800
[core] RollingFileWriter should not refer all writers (#1382)
---
.../org/apache/paimon/io/RollingFileWriter.java | 17 ++++++++++-----
.../org/apache/paimon/io/SingleFileWriter.java | 24 ++++++++++++++++++++++
2 files changed, 36 insertions(+), 5 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
index 8bc8e41f8..7cd27e565 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
@@ -20,6 +20,7 @@
package org.apache.paimon.io;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.io.SingleFileWriter.AbortExecutor;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
@@ -44,7 +45,7 @@ public class RollingFileWriter<T, R> implements FileWriter<T,
List<R>> {
private final Supplier<? extends SingleFileWriter<T, R>> writerFactory;
private final long targetFileSize;
- private final List<SingleFileWriter<T, R>> openedWriters;
+ private final List<AbortExecutor> closedWriters;
private final List<R> results;
private SingleFileWriter<T, R> currentWriter = null;
@@ -55,8 +56,8 @@ public class RollingFileWriter<T, R> implements FileWriter<T,
List<R>> {
Supplier<? extends SingleFileWriter<T, R>> writerFactory, long
targetFileSize) {
this.writerFactory = writerFactory;
this.targetFileSize = targetFileSize;
- this.openedWriters = new ArrayList<>();
this.results = new ArrayList<>();
+ this.closedWriters = new ArrayList<>();
}
@VisibleForTesting
@@ -97,7 +98,6 @@ public class RollingFileWriter<T, R> implements FileWriter<T,
List<R>> {
private void openCurrentWriter() {
currentWriter = writerFactory.get();
- openedWriters.add(currentWriter);
}
private void closeCurrentWriter() throws IOException {
@@ -106,6 +106,10 @@ public class RollingFileWriter<T, R> implements
FileWriter<T, List<R>> {
}
currentWriter.close();
+ // only store abort executor in memory
+ // cannot store whole writer, it includes lots of memory for example
column vectors to read
+ // and write
+ closedWriters.add(currentWriter.abortExecutor());
results.add(currentWriter.result());
currentWriter = null;
}
@@ -117,8 +121,11 @@ public class RollingFileWriter<T, R> implements
FileWriter<T, List<R>> {
@Override
public void abort() {
- for (FileWriter<T, R> writer : openedWriters) {
- writer.abort();
+ if (currentWriter != null) {
+ currentWriter.abort();
+ }
+ for (AbortExecutor abortExecutor : closedWriters) {
+ abortExecutor.abort();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index 57cdc920e..06985026c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -124,6 +124,14 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
fileIO.deleteQuietly(path);
}
+ public AbortExecutor abortExecutor() {
+ if (!closed) {
+ throw new RuntimeException("Writer should be closed!");
+ }
+
+ return new AbortExecutor(fileIO, path);
+ }
+
@Override
public void close() throws IOException {
if (closed) {
@@ -149,4 +157,20 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
closed = true;
}
}
+
+ /** Abort executor to just have reference of path instead of whole writer.
*/
+ public static class AbortExecutor {
+
+ private final FileIO fileIO;
+ private final Path path;
+
+ private AbortExecutor(FileIO fileIO, Path path) {
+ this.fileIO = fileIO;
+ this.path = path;
+ }
+
+ public void abort() {
+ fileIO.deleteQuietly(path);
+ }
+ }
}