This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f7ebb842e [core] Fix InMemoryBuffer preempt loop (#2239)
f7ebb842e is described below
commit f7ebb842e9646f01243197e8231a55ec78d4c89e
Author: YeJunHao <[email protected]>
AuthorDate: Thu Nov 2 16:21:59 2023 +0800
[core] Fix InMemoryBuffer preempt loop (#2239)
---
.../org/apache/paimon/disk/InMemoryBuffer.java | 41 ++++++++++++-
.../apache/paimon/memory/MemoryPoolFactory.java | 2 +-
.../org/apache/paimon/disk/InMemoryBufferTest.java | 69 ++++++++++++++++++++++
3 files changed, 110 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
index 39a39e1d9..d1218ee66 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.RandomAccessInputView;
import org.apache.paimon.data.SimpleCollectingOutputView;
import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.utils.MutableObjectIterator;
@@ -34,6 +35,9 @@ import java.util.ArrayList;
/** Only cache {@link InternalRow}s in memory. */
public class InMemoryBuffer implements RowBuffer {
+ private static final EmptyInMemoryBufferIterator EMPTY_ITERATOR =
+ new EmptyInMemoryBufferIterator();
+
private final AbstractRowDataSerializer<InternalRow> serializer;
private final ArrayList<MemorySegment> recordBufferSegments;
private final SimpleCollectingOutputView recordCollector;
@@ -110,7 +114,10 @@ public class InMemoryBuffer implements RowBuffer {
@Override
public InMemoryBufferIterator newIterator() {
- tryInitialize();
+ if (!isInitialized) {
+ // to avoid request memory
+ return EMPTY_ITERATOR;
+ }
RandomAccessInputView recordBuffer =
new RandomAccessInputView(
this.recordBufferSegments, segmentSize,
numBytesInLastBuffer);
@@ -126,6 +133,9 @@ public class InMemoryBuffer implements RowBuffer {
}
int getNumRecordBuffers() {
+ if (!isInitialized) {
+ return 0;
+ }
int result = (int) (currentDataBufferOffset / segmentSize);
long mod = currentDataBufferOffset % segmentSize;
if (mod != 0) {
@@ -190,4 +200,33 @@ public class InMemoryBuffer implements RowBuffer {
@Override
public void close() {}
}
+
+ // Use this to return an empty iterator, instead of use an interface
(virtual function call will
+ // cause performance loss)
+ private static class EmptyInMemoryBufferIterator extends
InMemoryBufferIterator {
+
+ private EmptyInMemoryBufferIterator() {
+ super(null, new InternalRowSerializer());
+ }
+
+ @Override
+ public boolean advanceNext() {
+ return false;
+ }
+
+ @Override
+ public BinaryRow getRow() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BinaryRow next(BinaryRow reuse) {
+ return null;
+ }
+
+ @Override
+ public BinaryRow next() {
+ return null;
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java
b/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java
index 8770348e4..73bb66de2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java
@@ -67,7 +67,7 @@ public class MemoryPoolFactory {
}
private void preemptMemory(MemoryOwner owner) {
- long maxMemory = -1;
+ long maxMemory = 0;
MemoryOwner max = null;
for (MemoryOwner other : owners) {
// Don't preempt yourself! Write and flush at the same time, which
may lead to
diff --git
a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
index 8e3809baa..c197fcb8b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
@@ -24,8 +24,12 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.RowBuffer.RowBufferIterator;
import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemoryOwner;
+import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.types.DataTypes;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -143,4 +147,69 @@ public class InMemoryBufferTest {
RowBufferIterator iterator = buffer.newIterator();
assertThat(iterator.advanceNext()).isFalse();
}
+
+ @Test
+ public void testMemoryPoolWorksWellWithInMemoryBuffer() {
+ MemoryPoolFactory memoryPoolFactory =
+ new MemoryPoolFactory(
+ new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE,
DEFAULT_PAGE_SIZE));
+
+ Owner owner1 = new Owner(this.serializer);
+ Owner owner2 = new Owner(this.serializer);
+ memoryPoolFactory.addOwners(Arrays.asList(owner1, owner2));
+ memoryPoolFactory.notifyNewOwner(owner1);
+ memoryPoolFactory.notifyNewOwner(owner2);
+
+ owner1.reset();
+
+ for (int i = 0; i < 100; i++) {
+ Assertions.assertThatCode(owner2::put).doesNotThrowAnyException();
+ }
+ }
+
+ /** Used for test. */
+ public static class Owner implements MemoryOwner {
+
+ private final InternalRowSerializer internalRowSerializer;
+ private InMemoryBuffer inMemoryBuffer;
+ private final BinaryRow binaryRow;
+
+ public Owner(InternalRowSerializer internalRowSerializer) {
+ this.internalRowSerializer = internalRowSerializer;
+
+ binaryRow = new BinaryRow(1);
+ BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+
+ byte[] s = new byte[1024];
+ Arrays.fill(s, (byte) 'a');
+ binaryRowWriter.writeString(0, BinaryString.fromBytes(s));
+ binaryRowWriter.complete();
+ }
+
+ @Override
+ public void setMemoryPool(MemorySegmentPool memoryPool) {
+ this.inMemoryBuffer = new InMemoryBuffer(memoryPool,
internalRowSerializer);
+ }
+
+ @Override
+ public long memoryOccupancy() {
+ return inMemoryBuffer.memoryOccupancy();
+ }
+
+ @Override
+ public void flushMemory() {
+ inMemoryBuffer.complete();
+ // emulate real-world flushing data to disk, we need to call
newIterator method
+ inMemoryBuffer.newIterator();
+ inMemoryBuffer.reset();
+ }
+
+ public boolean put() throws Exception {
+ return inMemoryBuffer.put(binaryRow.copy());
+ }
+
+ public void reset() {
+ inMemoryBuffer.reset();
+ }
+ }
}