This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e2fcfc72 [core] Preempt memory will cause writer flush memory to DFS 
in AppendOnlyWriter (#2940)
5e2fcfc72 is described below

commit 5e2fcfc72abc93160df043f95d5d39644fc0f45e
Author: YeJunHao <[email protected]>
AuthorDate: Mon Mar 4 21:33:46 2024 +0800

    [core] Preempt memory will cause writer flush memory to DFS in 
AppendOnlyWriter (#2940)
---
 .../org/apache/paimon/append/AppendOnlyWriter.java | 21 ++++++--
 .../org/apache/paimon/disk/ExternalBuffer.java     |  6 +++
 .../org/apache/paimon/disk/InMemoryBuffer.java     |  5 ++
 .../java/org/apache/paimon/disk/RowBuffer.java     |  2 +
 .../apache/paimon/append/AppendOnlyWriterTest.java | 60 +++++++++++++++++++++-
 5 files changed, 89 insertions(+), 5 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 803ba475c..0810f023b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -176,8 +176,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
         return compactManager.isCompacting();
     }
 
-    private void flush(boolean waitForLatestCompaction, boolean 
forcedFullCompaction)
-            throws Exception {
+    @VisibleForTesting
+    void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction) 
throws Exception {
         long start = System.currentTimeMillis();
         List<DataFileMeta> flushedFiles = sinkWriter.flush();
 
@@ -279,7 +279,10 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
 
     @Override
     public void flushMemory() throws Exception {
-        flush(false, false);
+        boolean success = sinkWriter.flushMemory();
+        if (!success) {
+            flush(false, false);
+        }
     }
 
     @VisibleForTesting
@@ -303,6 +306,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
 
         List<DataFileMeta> flush() throws IOException;
 
+        boolean flushMemory() throws IOException;
+
         long memoryOccupancy();
 
         void close();
@@ -340,6 +345,11 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
             return flushedFiles;
         }
 
+        @Override
+        public boolean flushMemory() throws IOException {
+            return false;
+        }
+
         @Override
         public long memoryOccupancy() {
             return 0;
@@ -430,5 +440,10 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
         public boolean bufferSpillableWriter() {
             return spillable;
         }
+
+        @Override
+        public boolean flushMemory() throws IOException {
+            return writeBuffer.flushMemory();
+        }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
index 24c3611c7..5181a9ed8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
@@ -87,6 +87,12 @@ public class ExternalBuffer implements RowBuffer {
         addCompleted = false;
     }
 
+    @Override
+    public boolean flushMemory() throws IOException {
+        spill();
+        return true;
+    }
+
     @Override
     public boolean put(InternalRow row) throws IOException {
         checkState(!addCompleted, "This buffer has add completed.");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
index d1218ee66..83c4e423b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
@@ -80,6 +80,11 @@ public class InMemoryBuffer implements RowBuffer {
         }
     }
 
+    @Override
+    public boolean flushMemory() throws IOException {
+        return false;
+    }
+
     private void returnToSegmentPool() {
         pool.returnAll(this.recordBufferSegments);
         this.recordBufferSegments.clear();
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
index c5c881331..7589b45ea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
@@ -39,6 +39,8 @@ public interface RowBuffer {
 
     void reset();
 
+    boolean flushMemory() throws IOException;
+
     RowBufferIterator newIterator();
 
     /** Iterator to fetch record from buffer. */
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 05f8e883f..5ab6b8537 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.stats.FieldStatsArraySerializer;
 import org.apache.paimon.types.DataType;
@@ -54,6 +55,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -335,6 +337,60 @@ public class AppendOnlyWriterTest {
         writer.close();
     }
 
+    @Test
+    public void testSpillWorksAndMoreSmallFilesGenerated() throws Exception {
+        List<AppendOnlyWriter> writers = new ArrayList<>();
+        HeapMemorySegmentPool heapMemorySegmentPool = new 
HeapMemorySegmentPool(2501024L, 1024);
+        MemoryPoolFactory memoryPoolFactory = new 
MemoryPoolFactory(heapMemorySegmentPool);
+        for (int i = 0; i < 1000; i++) {
+            AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true);
+            memoryPoolFactory.addOwners(Arrays.asList(writer));
+            memoryPoolFactory.notifyNewOwner(writer);
+            writers.add(writer);
+        }
+
+        char[] s = new char[1024];
+        Arrays.fill(s, 'a');
+
+        for (AppendOnlyWriter writer : writers) {
+            writer.write(row(0, String.valueOf("a"), PART));
+        }
+
+        for (AppendOnlyWriter writer : writers) {
+            writer.write(row(0, String.valueOf(s), PART));
+        }
+
+        for (int j = 0; j < 100; j++) {
+            for (AppendOnlyWriter writer : writers) {
+                writer.write(row(j, String.valueOf(s), PART));
+                writer.write(row(j, String.valueOf(s), PART));
+                writer.write(row(j, String.valueOf(s), PART));
+                writer.write(row(j, String.valueOf(s), PART));
+                writer.write(row(j, String.valueOf(s), PART));
+            }
+        }
+
+        writers.forEach(
+                writer -> {
+                    try {
+                        List<DataFileMeta> fileMetas =
+                                
writer.prepareCommit(false).newFilesIncrement().newFiles();
+                        assertThat(fileMetas.size()).isEqualTo(1);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        writers.forEach(
+                writer -> {
+                    try {
+                        writer.close();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+    }
+
     @Test
     public void testNoBuffer() throws Exception {
         AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE);
@@ -363,7 +419,7 @@ public class AppendOnlyWriterTest {
             writer.write(row(j, String.valueOf(s), PART));
         }
 
-        writer.flushMemory();
+        writer.flush(false, false);
         Assertions.assertThat(writer.memoryOccupancy()).isEqualTo(0L);
         Assertions.assertThat(writer.getWriteBuffer().size()).isEqualTo(0);
         Assertions.assertThat(writer.getNewFiles().size()).isGreaterThan(0);
@@ -374,7 +430,7 @@ public class AppendOnlyWriterTest {
         for (int j = 0; j < 100; j++) {
             writer.write(row(j, String.valueOf(s), PART));
         }
-        writer.flushMemory();
+        writer.flush(false, false);
 
         Assertions.assertThat(writer.memoryOccupancy()).isEqualTo(0L);
         Assertions.assertThat(writer.getWriteBuffer().size()).isEqualTo(0);

Reply via email to