This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 04246b926 PARQUET-2357: Modest refactor of
CapacityByteArrayOutputStream (#1165)
04246b926 is described below
commit 04246b92634d87315eeed18070cae83ea3048002
Author: fengjiajie <[email protected]>
AuthorDate: Sat Oct 14 22:34:21 2023 +0800
PARQUET-2357: Modest refactor of CapacityByteArrayOutputStream (#1165)
---
.../bytes/CapacityByteArrayOutputStream.java | 21 ++++------------
.../bytes/TestCapacityByteArrayOutputStream.java | 29 ++++++++++++++++++++++
2 files changed, 34 insertions(+), 16 deletions(-)
diff --git
a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
index 65e5b0ee4..0e7b758c4 100644
---
a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
+++
b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
@@ -61,7 +61,6 @@ public class CapacityByteArrayOutputStream extends
OutputStream {
private final List<ByteBuffer> slabs = new ArrayList<ByteBuffer>();
private ByteBuffer currentSlab;
- private int currentSlabIndex;
private int bytesAllocated = 0;
private int bytesUsed = 0;
private ByteBufferAllocator allocator;
@@ -193,7 +192,6 @@ public class CapacityByteArrayOutputStream extends
OutputStream {
this.currentSlab = allocator.allocate(nextSlabSize);
this.slabs.add(currentSlab);
this.bytesAllocated = Math.addExact(this.bytesAllocated, nextSlabSize);
- this.currentSlabIndex = 0;
}
@Override
@@ -201,9 +199,7 @@ public class CapacityByteArrayOutputStream extends
OutputStream {
if (!currentSlab.hasRemaining()) {
addSlab(1);
}
- currentSlab.put(currentSlabIndex, (byte) b);
- currentSlabIndex += 1;
- currentSlab.position(currentSlabIndex);
+ currentSlab.put((byte) b);
bytesUsed = Math.addExact(bytesUsed, 1);
}
@@ -214,21 +210,16 @@ public class CapacityByteArrayOutputStream extends
OutputStream {
throw new IndexOutOfBoundsException(
String.format("Given byte array of size %d, with requested
length(%d) and offset(%d)", b.length, len, off));
}
- if (len >= currentSlab.remaining()) {
+ if (len > currentSlab.remaining()) {
final int length1 = currentSlab.remaining();
currentSlab.put(b, off, length1);
- bytesUsed = Math.addExact(bytesUsed, length1);
- currentSlabIndex += length1;
final int length2 = len - length1;
addSlab(length2);
currentSlab.put(b, off + length1, length2);
- currentSlabIndex = length2;
- bytesUsed = Math.addExact(bytesUsed, length2);
} else {
currentSlab.put(b, off, len);
- currentSlabIndex += len;
- bytesUsed = Math.addExact(bytesUsed, len);
}
+ bytesUsed = Math.addExact(bytesUsed, len);
}
private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws
IOException {
@@ -252,10 +243,9 @@ public class CapacityByteArrayOutputStream extends
OutputStream {
* @exception IOException if an I/O error occurs.
*/
public void writeTo(OutputStream out) throws IOException {
- for (int i = 0; i < slabs.size() - 1; i++) {
- writeToOutput(out, slabs.get(i), slabs.get(i).position());
+ for (ByteBuffer slab : slabs) {
+ writeToOutput(out, slab, slab.position());
}
- writeToOutput(out, currentSlab, currentSlabIndex);
}
/**
@@ -290,7 +280,6 @@ public class CapacityByteArrayOutputStream extends
OutputStream {
this.bytesAllocated = 0;
this.bytesUsed = 0;
this.currentSlab = EMPTY_SLAB;
- this.currentSlabIndex = 0;
}
/**
diff --git
a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
index 89db1981f..b26f486fb 100644
---
a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
+++
b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
@@ -49,6 +49,35 @@ public class TestCapacityByteArrayOutputStream {
validate(capacityByteArrayOutputStream, v * 3);
}
+ @Test
+ public void testWriteArrayExpand() throws Throwable {
+ CapacityByteArrayOutputStream capacityByteArrayOutputStream =
newCapacityBAOS(2);
+ assertEquals(0, capacityByteArrayOutputStream.getCapacity());
+
+ byte[] toWrite = {(byte) (1), (byte) (2), (byte) (3), (byte) (4)};
+ int toWriteOffset = 0;
+ int writeLength = 2;
+ // write 2 bytes array
+ capacityByteArrayOutputStream.write(toWrite, toWriteOffset, writeLength);
+ toWriteOffset += writeLength;
+ assertEquals(2, capacityByteArrayOutputStream.size());
+ assertEquals(2, capacityByteArrayOutputStream.getCapacity());
+
+ // write 1 byte array, expand capacity to 4
+ writeLength = 1;
+ capacityByteArrayOutputStream.write(toWrite, toWriteOffset, writeLength);
+ toWriteOffset += writeLength;
+ assertEquals(3, capacityByteArrayOutputStream.size());
+ assertEquals(4, capacityByteArrayOutputStream.getCapacity());
+
+ // write 1 byte array, not expand
+ capacityByteArrayOutputStream.write(toWrite, toWriteOffset, writeLength);
+ assertEquals(4, capacityByteArrayOutputStream.size());
+ assertEquals(4, capacityByteArrayOutputStream.getCapacity());
+ final byte[] byteArray =
BytesInput.from(capacityByteArrayOutputStream).toByteArray();
+ assertArrayEquals(toWrite, byteArray);
+ }
+
@Test
public void testWriteArrayAndInt() throws Throwable {
CapacityByteArrayOutputStream capacityByteArrayOutputStream =
newCapacityBAOS(10);