This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 72b31d446 [core] Fix iterator bug after reset for InMemoryBuffer 
(#2175)
72b31d446 is described below

commit 72b31d44633628761484ce7685c2df4a800c6c41
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Oct 25 13:19:53 2023 +0800

    [core] Fix iterator bug after reset for InMemoryBuffer (#2175)
---
 .../org/apache/paimon/disk/InMemoryBuffer.java     | 24 ++++++++--
 .../org/apache/paimon/disk/InMemoryBufferTest.java | 53 +++++++++++++++++-----
 2 files changed, 61 insertions(+), 16 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
index a9eaf55e5..39a39e1d9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
@@ -44,6 +44,8 @@ public class InMemoryBuffer implements RowBuffer {
     private int numBytesInLastBuffer;
     private int numRecords = 0;
 
+    private boolean isInitialized;
+
     InMemoryBuffer(MemorySegmentPool pool, 
AbstractRowDataSerializer<InternalRow> serializer) {
         // serializer has states, so we must duplicate
         this.serializer = (AbstractRowDataSerializer<InternalRow>) 
serializer.duplicate();
@@ -52,14 +54,26 @@ public class InMemoryBuffer implements RowBuffer {
         this.recordBufferSegments = new ArrayList<>();
         this.recordCollector =
                 new SimpleCollectingOutputView(this.recordBufferSegments, 
pool, segmentSize);
+        this.isInitialized = true;
+    }
+
+    /** Try to initialize the buffer if all contained data is discarded. */
+    private void tryInitialize() {
+        if (!isInitialized) {
+            this.recordCollector.reset();
+            this.isInitialized = true;
+        }
     }
 
     @Override
     public void reset() {
-        this.currentDataBufferOffset = 0;
-        this.numRecords = 0;
-        returnToSegmentPool();
-        this.recordCollector.reset();
+        if (this.isInitialized) {
+            this.currentDataBufferOffset = 0;
+            this.numBytesInLastBuffer = 0;
+            this.numRecords = 0;
+            returnToSegmentPool();
+            this.isInitialized = false;
+        }
     }
 
     private void returnToSegmentPool() {
@@ -70,6 +84,7 @@ public class InMemoryBuffer implements RowBuffer {
     @Override
     public boolean put(InternalRow row) throws IOException {
         try {
+            tryInitialize();
             this.serializer.serializeToPages(row, this.recordCollector);
             currentDataBufferOffset = this.recordCollector.getCurrentOffset();
             numBytesInLastBuffer = 
this.recordCollector.getCurrentPositionInSegment();
@@ -95,6 +110,7 @@ public class InMemoryBuffer implements RowBuffer {
 
     @Override
     public InMemoryBufferIterator newIterator() {
+        tryInitialize();
         RandomAccessInputView recordBuffer =
                 new RandomAccessInputView(
                         this.recordBufferSegments, segmentSize, 
numBytesInLastBuffer);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java 
b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
index 9bcfdc6c6..8e3809baa 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
@@ -22,18 +22,19 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.disk.RowBuffer.RowBufferIterator;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.types.DataTypes;
 
-import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 
 import static org.apache.paimon.memory.MemorySegmentPool.DEFAULT_PAGE_SIZE;
+import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link RowBuffer}. */
+/** Tests for {@link InMemoryBuffer}. */
 public class InMemoryBufferTest {
 
     private InternalRowSerializer serializer;
@@ -59,13 +60,13 @@ public class InMemoryBufferTest {
         binaryRowWriter.complete();
 
         boolean result = buffer.put(binaryRow);
-        Assertions.assertThat(result).isTrue();
+        assertThat(result).isTrue();
         result = buffer.put(binaryRow);
-        Assertions.assertThat(result).isTrue();
+        assertThat(result).isTrue();
         result = buffer.put(binaryRow);
-        Assertions.assertThat(result).isTrue();
+        assertThat(result).isTrue();
         result = buffer.put(binaryRow);
-        Assertions.assertThat(result).isFalse();
+        assertThat(result).isFalse();
     }
 
     @Test
@@ -86,16 +87,16 @@ public class InMemoryBufferTest {
             buffer.put(binaryRow.copy());
         }
 
-        Assertions.assertThat(buffer.size()).isEqualTo(100);
-        try (RowBuffer.RowBufferIterator iterator = buffer.newIterator()) {
+        assertThat(buffer.size()).isEqualTo(100);
+        try (RowBufferIterator iterator = buffer.newIterator()) {
             while (iterator.advanceNext()) {
-                Assertions.assertThat(iterator.getRow()).isEqualTo(binaryRow);
+                assertThat(iterator.getRow()).isEqualTo(binaryRow);
             }
         }
     }
 
     @Test
-    public void testClose() throws Exception {
+    public void testReset() throws Exception {
         InMemoryBuffer buffer =
                 new InMemoryBuffer(
                         new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE, 
DEFAULT_PAGE_SIZE),
@@ -110,8 +111,36 @@ public class InMemoryBufferTest {
         binaryRowWriter.complete();
         buffer.put(binaryRow.copy());
 
-        Assertions.assertThat(buffer.memoryOccupancy()).isGreaterThan(0);
+        assertThat(buffer.memoryOccupancy()).isGreaterThan(0);
         buffer.reset();
-        Assertions.assertThat(buffer.memoryOccupancy()).isEqualTo(0);
+        assertThat(buffer.memoryOccupancy()).isEqualTo(0);
+
+        // test read after reset
+        try (RowBufferIterator iterator = buffer.newIterator()) {
+            assertThat(iterator.advanceNext()).isFalse();
+        }
+
+        // write again
+        buffer.put(binaryRow.copy());
+        buffer.put(binaryRow.copy());
+        buffer.put(binaryRow.copy());
+        try (RowBufferIterator iterator = buffer.newIterator()) {
+            int count = 0;
+            while (iterator.advanceNext()) {
+                assertThat(iterator.getRow()).isEqualTo(binaryRow);
+                count++;
+            }
+            assertThat(count).isEqualTo(3);
+        }
+    }
+
+    @Test
+    public void testEmpty() throws Exception {
+        InMemoryBuffer buffer =
+                new InMemoryBuffer(
+                        new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE, 
DEFAULT_PAGE_SIZE),
+                        this.serializer);
+        RowBufferIterator iterator = buffer.newIterator();
+        assertThat(iterator.advanceNext()).isFalse();
     }
 }

Reply via email to