This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 72b31d446 [core] Fix iterator bug after reset for InMemoryBuffer
(#2175)
72b31d446 is described below
commit 72b31d44633628761484ce7685c2df4a800c6c41
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Oct 25 13:19:53 2023 +0800
[core] Fix iterator bug after reset for InMemoryBuffer (#2175)
---
.../org/apache/paimon/disk/InMemoryBuffer.java | 24 ++++++++--
.../org/apache/paimon/disk/InMemoryBufferTest.java | 53 +++++++++++++++++-----
2 files changed, 61 insertions(+), 16 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
index a9eaf55e5..39a39e1d9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
@@ -44,6 +44,8 @@ public class InMemoryBuffer implements RowBuffer {
private int numBytesInLastBuffer;
private int numRecords = 0;
+ private boolean isInitialized;
+
InMemoryBuffer(MemorySegmentPool pool,
AbstractRowDataSerializer<InternalRow> serializer) {
// serializer has states, so we must duplicate
this.serializer = (AbstractRowDataSerializer<InternalRow>)
serializer.duplicate();
@@ -52,14 +54,26 @@ public class InMemoryBuffer implements RowBuffer {
this.recordBufferSegments = new ArrayList<>();
this.recordCollector =
new SimpleCollectingOutputView(this.recordBufferSegments,
pool, segmentSize);
+ this.isInitialized = true;
+ }
+
+ /** Try to initialize the buffer if all contained data is discarded. */
+ private void tryInitialize() {
+ if (!isInitialized) {
+ this.recordCollector.reset();
+ this.isInitialized = true;
+ }
}
@Override
public void reset() {
- this.currentDataBufferOffset = 0;
- this.numRecords = 0;
- returnToSegmentPool();
- this.recordCollector.reset();
+ if (this.isInitialized) {
+ this.currentDataBufferOffset = 0;
+ this.numBytesInLastBuffer = 0;
+ this.numRecords = 0;
+ returnToSegmentPool();
+ this.isInitialized = false;
+ }
}
private void returnToSegmentPool() {
@@ -70,6 +84,7 @@ public class InMemoryBuffer implements RowBuffer {
@Override
public boolean put(InternalRow row) throws IOException {
try {
+ tryInitialize();
this.serializer.serializeToPages(row, this.recordCollector);
currentDataBufferOffset = this.recordCollector.getCurrentOffset();
numBytesInLastBuffer =
this.recordCollector.getCurrentPositionInSegment();
@@ -95,6 +110,7 @@ public class InMemoryBuffer implements RowBuffer {
@Override
public InMemoryBufferIterator newIterator() {
+ tryInitialize();
RandomAccessInputView recordBuffer =
new RandomAccessInputView(
this.recordBufferSegments, segmentSize,
numBytesInLastBuffer);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
index 9bcfdc6c6..8e3809baa 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
@@ -22,18 +22,19 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.disk.RowBuffer.RowBufferIterator;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.types.DataTypes;
-import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import static org.apache.paimon.memory.MemorySegmentPool.DEFAULT_PAGE_SIZE;
+import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link RowBuffer}. */
+/** Tests for {@link InMemoryBuffer}. */
public class InMemoryBufferTest {
private InternalRowSerializer serializer;
@@ -59,13 +60,13 @@ public class InMemoryBufferTest {
binaryRowWriter.complete();
boolean result = buffer.put(binaryRow);
- Assertions.assertThat(result).isTrue();
+ assertThat(result).isTrue();
result = buffer.put(binaryRow);
- Assertions.assertThat(result).isTrue();
+ assertThat(result).isTrue();
result = buffer.put(binaryRow);
- Assertions.assertThat(result).isTrue();
+ assertThat(result).isTrue();
result = buffer.put(binaryRow);
- Assertions.assertThat(result).isFalse();
+ assertThat(result).isFalse();
}
@Test
@@ -86,16 +87,16 @@ public class InMemoryBufferTest {
buffer.put(binaryRow.copy());
}
- Assertions.assertThat(buffer.size()).isEqualTo(100);
- try (RowBuffer.RowBufferIterator iterator = buffer.newIterator()) {
+ assertThat(buffer.size()).isEqualTo(100);
+ try (RowBufferIterator iterator = buffer.newIterator()) {
while (iterator.advanceNext()) {
- Assertions.assertThat(iterator.getRow()).isEqualTo(binaryRow);
+ assertThat(iterator.getRow()).isEqualTo(binaryRow);
}
}
}
@Test
- public void testClose() throws Exception {
+ public void testReset() throws Exception {
InMemoryBuffer buffer =
new InMemoryBuffer(
new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE,
DEFAULT_PAGE_SIZE),
@@ -110,8 +111,36 @@ public class InMemoryBufferTest {
binaryRowWriter.complete();
buffer.put(binaryRow.copy());
- Assertions.assertThat(buffer.memoryOccupancy()).isGreaterThan(0);
+ assertThat(buffer.memoryOccupancy()).isGreaterThan(0);
buffer.reset();
- Assertions.assertThat(buffer.memoryOccupancy()).isEqualTo(0);
+ assertThat(buffer.memoryOccupancy()).isEqualTo(0);
+
+ // test read after reset
+ try (RowBufferIterator iterator = buffer.newIterator()) {
+ assertThat(iterator.advanceNext()).isFalse();
+ }
+
+ // write again
+ buffer.put(binaryRow.copy());
+ buffer.put(binaryRow.copy());
+ buffer.put(binaryRow.copy());
+ try (RowBufferIterator iterator = buffer.newIterator()) {
+ int count = 0;
+ while (iterator.advanceNext()) {
+ assertThat(iterator.getRow()).isEqualTo(binaryRow);
+ count++;
+ }
+ assertThat(count).isEqualTo(3);
+ }
+ }
+
+ @Test
+ public void testEmpty() throws Exception {
+ InMemoryBuffer buffer =
+ new InMemoryBuffer(
+ new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE,
DEFAULT_PAGE_SIZE),
+ this.serializer);
+ RowBufferIterator iterator = buffer.newIterator();
+ assertThat(iterator.advanceNext()).isFalse();
}
}