This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5e2fcfc72 [core] Preempt memory will cause writer flush memory to DFS
in AppendOnlyWriter (#2940)
5e2fcfc72 is described below
commit 5e2fcfc72abc93160df043f95d5d39644fc0f45e
Author: YeJunHao <[email protected]>
AuthorDate: Mon Mar 4 21:33:46 2024 +0800
[core] Preempt memory will cause writer flush memory to DFS in
AppendOnlyWriter (#2940)
---
.../org/apache/paimon/append/AppendOnlyWriter.java | 21 ++++++--
.../org/apache/paimon/disk/ExternalBuffer.java | 6 +++
.../org/apache/paimon/disk/InMemoryBuffer.java | 5 ++
.../java/org/apache/paimon/disk/RowBuffer.java | 2 +
.../apache/paimon/append/AppendOnlyWriterTest.java | 60 +++++++++++++++++++++-
5 files changed, 89 insertions(+), 5 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 803ba475c..0810f023b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -176,8 +176,8 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
return compactManager.isCompacting();
}
- private void flush(boolean waitForLatestCompaction, boolean
forcedFullCompaction)
- throws Exception {
+ @VisibleForTesting
+ void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction)
throws Exception {
long start = System.currentTimeMillis();
List<DataFileMeta> flushedFiles = sinkWriter.flush();
@@ -279,7 +279,10 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
@Override
public void flushMemory() throws Exception {
- flush(false, false);
+ boolean success = sinkWriter.flushMemory();
+ if (!success) {
+ flush(false, false);
+ }
}
@VisibleForTesting
@@ -303,6 +306,8 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
List<DataFileMeta> flush() throws IOException;
+ boolean flushMemory() throws IOException;
+
long memoryOccupancy();
void close();
@@ -340,6 +345,11 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
return flushedFiles;
}
+ @Override
+ public boolean flushMemory() throws IOException {
+ return false;
+ }
+
@Override
public long memoryOccupancy() {
return 0;
@@ -430,5 +440,10 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
public boolean bufferSpillableWriter() {
return spillable;
}
+
+ @Override
+ public boolean flushMemory() throws IOException {
+ return writeBuffer.flushMemory();
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
index 24c3611c7..5181a9ed8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
@@ -87,6 +87,12 @@ public class ExternalBuffer implements RowBuffer {
addCompleted = false;
}
+ @Override
+ public boolean flushMemory() throws IOException {
+ spill();
+ return true;
+ }
+
@Override
public boolean put(InternalRow row) throws IOException {
checkState(!addCompleted, "This buffer has add completed.");
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
index d1218ee66..83c4e423b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
@@ -80,6 +80,11 @@ public class InMemoryBuffer implements RowBuffer {
}
}
+ @Override
+ public boolean flushMemory() throws IOException {
+ return false;
+ }
+
private void returnToSegmentPool() {
pool.returnAll(this.recordBufferSegments);
this.recordBufferSegments.clear();
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
index c5c881331..7589b45ea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
@@ -39,6 +39,8 @@ public interface RowBuffer {
void reset();
+ boolean flushMemory() throws IOException;
+
RowBufferIterator newIterator();
/** Iterator to fetch record from buffer. */
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 05f8e883f..5ab6b8537 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.types.DataType;
@@ -54,6 +55,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -335,6 +337,60 @@ public class AppendOnlyWriterTest {
writer.close();
}
+ @Test
+ public void testSpillWorksAndMoreSmallFilesGenerated() throws Exception {
+ List<AppendOnlyWriter> writers = new ArrayList<>();
+ HeapMemorySegmentPool heapMemorySegmentPool = new
HeapMemorySegmentPool(2501024L, 1024);
+ MemoryPoolFactory memoryPoolFactory = new
MemoryPoolFactory(heapMemorySegmentPool);
+ for (int i = 0; i < 1000; i++) {
+ AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true);
+ memoryPoolFactory.addOwners(Arrays.asList(writer));
+ memoryPoolFactory.notifyNewOwner(writer);
+ writers.add(writer);
+ }
+
+ char[] s = new char[1024];
+ Arrays.fill(s, 'a');
+
+ for (AppendOnlyWriter writer : writers) {
+ writer.write(row(0, String.valueOf("a"), PART));
+ }
+
+ for (AppendOnlyWriter writer : writers) {
+ writer.write(row(0, String.valueOf(s), PART));
+ }
+
+ for (int j = 0; j < 100; j++) {
+ for (AppendOnlyWriter writer : writers) {
+ writer.write(row(j, String.valueOf(s), PART));
+ writer.write(row(j, String.valueOf(s), PART));
+ writer.write(row(j, String.valueOf(s), PART));
+ writer.write(row(j, String.valueOf(s), PART));
+ writer.write(row(j, String.valueOf(s), PART));
+ }
+ }
+
+ writers.forEach(
+ writer -> {
+ try {
+ List<DataFileMeta> fileMetas =
+
writer.prepareCommit(false).newFilesIncrement().newFiles();
+ assertThat(fileMetas.size()).isEqualTo(1);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ writers.forEach(
+ writer -> {
+ try {
+ writer.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
@Test
public void testNoBuffer() throws Exception {
AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE);
@@ -363,7 +419,7 @@ public class AppendOnlyWriterTest {
writer.write(row(j, String.valueOf(s), PART));
}
- writer.flushMemory();
+ writer.flush(false, false);
Assertions.assertThat(writer.memoryOccupancy()).isEqualTo(0L);
Assertions.assertThat(writer.getWriteBuffer().size()).isEqualTo(0);
Assertions.assertThat(writer.getNewFiles().size()).isGreaterThan(0);
@@ -374,7 +430,7 @@ public class AppendOnlyWriterTest {
for (int j = 0; j < 100; j++) {
writer.write(row(j, String.valueOf(s), PART));
}
- writer.flushMemory();
+ writer.flush(false, false);
Assertions.assertThat(writer.memoryOccupancy()).isEqualTo(0L);
Assertions.assertThat(writer.getWriteBuffer().size()).isEqualTo(0);