This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f7ebb842e [core] Fix InMemoryBuffer preempt loop (#2239)
f7ebb842e is described below

commit f7ebb842e9646f01243197e8231a55ec78d4c89e
Author: YeJunHao <[email protected]>
AuthorDate: Thu Nov 2 16:21:59 2023 +0800

    [core] Fix InMemoryBuffer preempt loop (#2239)
---
 .../org/apache/paimon/disk/InMemoryBuffer.java     | 41 ++++++++++++-
 .../apache/paimon/memory/MemoryPoolFactory.java    |  2 +-
 .../org/apache/paimon/disk/InMemoryBufferTest.java | 69 ++++++++++++++++++++++
 3 files changed, 110 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
index 39a39e1d9..d1218ee66 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.RandomAccessInputView;
 import org.apache.paimon.data.SimpleCollectingOutputView;
 import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.utils.MutableObjectIterator;
@@ -34,6 +35,9 @@ import java.util.ArrayList;
 /** Only cache {@link InternalRow}s in memory. */
 public class InMemoryBuffer implements RowBuffer {
 
+    private static final EmptyInMemoryBufferIterator EMPTY_ITERATOR =
+            new EmptyInMemoryBufferIterator();
+
     private final AbstractRowDataSerializer<InternalRow> serializer;
     private final ArrayList<MemorySegment> recordBufferSegments;
     private final SimpleCollectingOutputView recordCollector;
@@ -110,7 +114,10 @@ public class InMemoryBuffer implements RowBuffer {
 
     @Override
     public InMemoryBufferIterator newIterator() {
-        tryInitialize();
+        if (!isInitialized) {
+            // to avoid request memory
+            return EMPTY_ITERATOR;
+        }
         RandomAccessInputView recordBuffer =
                 new RandomAccessInputView(
                         this.recordBufferSegments, segmentSize, 
numBytesInLastBuffer);
@@ -126,6 +133,9 @@ public class InMemoryBuffer implements RowBuffer {
     }
 
     int getNumRecordBuffers() {
+        if (!isInitialized) {
+            return 0;
+        }
         int result = (int) (currentDataBufferOffset / segmentSize);
         long mod = currentDataBufferOffset % segmentSize;
         if (mod != 0) {
@@ -190,4 +200,33 @@ public class InMemoryBuffer implements RowBuffer {
         @Override
         public void close() {}
     }
+
+    // Use this to return an empty iterator, instead of use an interface 
(virtual function call will
+    // cause performance loss)
+    private static class EmptyInMemoryBufferIterator extends 
InMemoryBufferIterator {
+
+        private EmptyInMemoryBufferIterator() {
+            super(null, new InternalRowSerializer());
+        }
+
+        @Override
+        public boolean advanceNext() {
+            return false;
+        }
+
+        @Override
+        public BinaryRow getRow() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public BinaryRow next(BinaryRow reuse) {
+            return null;
+        }
+
+        @Override
+        public BinaryRow next() {
+            return null;
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java
index 8770348e4..73bb66de2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java
@@ -67,7 +67,7 @@ public class MemoryPoolFactory {
     }
 
     private void preemptMemory(MemoryOwner owner) {
-        long maxMemory = -1;
+        long maxMemory = 0;
         MemoryOwner max = null;
         for (MemoryOwner other : owners) {
             // Don't preempt yourself! Write and flush at the same time, which 
may lead to
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java 
b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
index 8e3809baa..c197fcb8b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
@@ -24,8 +24,12 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.disk.RowBuffer.RowBufferIterator;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemoryOwner;
+import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.types.DataTypes;
 
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -143,4 +147,69 @@ public class InMemoryBufferTest {
         RowBufferIterator iterator = buffer.newIterator();
         assertThat(iterator.advanceNext()).isFalse();
     }
+
+    @Test
+    public void testMemoryPoolWorksWellWithInMemoryBuffer() {
+        MemoryPoolFactory memoryPoolFactory =
+                new MemoryPoolFactory(
+                        new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE, 
DEFAULT_PAGE_SIZE));
+
+        Owner owner1 = new Owner(this.serializer);
+        Owner owner2 = new Owner(this.serializer);
+        memoryPoolFactory.addOwners(Arrays.asList(owner1, owner2));
+        memoryPoolFactory.notifyNewOwner(owner1);
+        memoryPoolFactory.notifyNewOwner(owner2);
+
+        owner1.reset();
+
+        for (int i = 0; i < 100; i++) {
+            Assertions.assertThatCode(owner2::put).doesNotThrowAnyException();
+        }
+    }
+
+    /** Used for test. */
+    public static class Owner implements MemoryOwner {
+
+        private final InternalRowSerializer internalRowSerializer;
+        private InMemoryBuffer inMemoryBuffer;
+        private final BinaryRow binaryRow;
+
+        public Owner(InternalRowSerializer internalRowSerializer) {
+            this.internalRowSerializer = internalRowSerializer;
+
+            binaryRow = new BinaryRow(1);
+            BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+
+            byte[] s = new byte[1024];
+            Arrays.fill(s, (byte) 'a');
+            binaryRowWriter.writeString(0, BinaryString.fromBytes(s));
+            binaryRowWriter.complete();
+        }
+
+        @Override
+        public void setMemoryPool(MemorySegmentPool memoryPool) {
+            this.inMemoryBuffer = new InMemoryBuffer(memoryPool, 
internalRowSerializer);
+        }
+
+        @Override
+        public long memoryOccupancy() {
+            return inMemoryBuffer.memoryOccupancy();
+        }
+
+        @Override
+        public void flushMemory() {
+            inMemoryBuffer.complete();
+            // emulate real-world flushing data to disk, we need to call 
newIterator method
+            inMemoryBuffer.newIterator();
+            inMemoryBuffer.reset();
+        }
+
+        public boolean put() throws Exception {
+            return inMemoryBuffer.put(binaryRow.copy());
+        }
+
+        public void reset() {
+            inMemoryBuffer.reset();
+        }
+    }
 }

Reply via email to