This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 60b8d1d0e [core] RollingFileWriter should not refer all writers (#1382)
60b8d1d0e is described below

commit 60b8d1d0e0fec70c5f725e9db9d38f446e1323b7
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jun 15 17:00:08 2023 +0800

    [core] RollingFileWriter should not refer all writers (#1382)
---
 .../org/apache/paimon/io/RollingFileWriter.java    | 17 ++++++++++-----
 .../org/apache/paimon/io/SingleFileWriter.java     | 24 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 5 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
index 8bc8e41f8..7cd27e565 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriter.java
@@ -20,6 +20,7 @@
 package org.apache.paimon.io;
 
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.io.SingleFileWriter.AbortExecutor;
 import org.apache.paimon.utils.Preconditions;
 
 import org.slf4j.Logger;
@@ -44,7 +45,7 @@ public class RollingFileWriter<T, R> implements FileWriter<T, 
List<R>> {
 
     private final Supplier<? extends SingleFileWriter<T, R>> writerFactory;
     private final long targetFileSize;
-    private final List<SingleFileWriter<T, R>> openedWriters;
+    private final List<AbortExecutor> closedWriters;
     private final List<R> results;
 
     private SingleFileWriter<T, R> currentWriter = null;
@@ -55,8 +56,8 @@ public class RollingFileWriter<T, R> implements FileWriter<T, 
List<R>> {
             Supplier<? extends SingleFileWriter<T, R>> writerFactory, long 
targetFileSize) {
         this.writerFactory = writerFactory;
         this.targetFileSize = targetFileSize;
-        this.openedWriters = new ArrayList<>();
         this.results = new ArrayList<>();
+        this.closedWriters = new ArrayList<>();
     }
 
     @VisibleForTesting
@@ -97,7 +98,6 @@ public class RollingFileWriter<T, R> implements FileWriter<T, 
List<R>> {
 
     private void openCurrentWriter() {
         currentWriter = writerFactory.get();
-        openedWriters.add(currentWriter);
     }
 
     private void closeCurrentWriter() throws IOException {
@@ -106,6 +106,10 @@ public class RollingFileWriter<T, R> implements 
FileWriter<T, List<R>> {
         }
 
         currentWriter.close();
+        // only store abort executor in memory
+        // cannot store whole writer, it includes lots of memory for example 
column vectors to read
+        // and write
+        closedWriters.add(currentWriter.abortExecutor());
         results.add(currentWriter.result());
         currentWriter = null;
     }
@@ -117,8 +121,11 @@ public class RollingFileWriter<T, R> implements 
FileWriter<T, List<R>> {
 
     @Override
     public void abort() {
-        for (FileWriter<T, R> writer : openedWriters) {
-            writer.abort();
+        if (currentWriter != null) {
+            currentWriter.abort();
+        }
+        for (AbortExecutor abortExecutor : closedWriters) {
+            abortExecutor.abort();
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index 57cdc920e..06985026c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -124,6 +124,14 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
         fileIO.deleteQuietly(path);
     }
 
+    public AbortExecutor abortExecutor() {
+        if (!closed) {
+            throw new RuntimeException("Writer should be closed!");
+        }
+
+        return new AbortExecutor(fileIO, path);
+    }
+
     @Override
     public void close() throws IOException {
         if (closed) {
@@ -149,4 +157,20 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
             closed = true;
         }
     }
+
+    /** Abort executor to just have reference of path instead of whole writer. 
*/
+    public static class AbortExecutor {
+
+        private final FileIO fileIO;
+        private final Path path;
+
+        private AbortExecutor(FileIO fileIO, Path path) {
+            this.fileIO = fileIO;
+            this.path = path;
+        }
+
+        public void abort() {
+            fileIO.deleteQuietly(path);
+        }
+    }
 }

Reply via email to