This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 04246b926 PARQUET-2357: Modest refactor of 
CapacityByteArrayOutputStream (#1165)
04246b926 is described below

commit 04246b92634d87315eeed18070cae83ea3048002
Author: fengjiajie <[email protected]>
AuthorDate: Sat Oct 14 22:34:21 2023 +0800

    PARQUET-2357: Modest refactor of CapacityByteArrayOutputStream (#1165)
---
 .../bytes/CapacityByteArrayOutputStream.java       | 21 ++++------------
 .../bytes/TestCapacityByteArrayOutputStream.java   | 29 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 16 deletions(-)

diff --git 
a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
 
b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
index 65e5b0ee4..0e7b758c4 100644
--- 
a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
+++ 
b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
@@ -61,7 +61,6 @@ public class CapacityByteArrayOutputStream extends 
OutputStream {
   private final List<ByteBuffer> slabs = new ArrayList<ByteBuffer>();
 
   private ByteBuffer currentSlab;
-  private int currentSlabIndex;
   private int bytesAllocated = 0;
   private int bytesUsed = 0;
   private ByteBufferAllocator allocator;
@@ -193,7 +192,6 @@ public class CapacityByteArrayOutputStream extends 
OutputStream {
     this.currentSlab = allocator.allocate(nextSlabSize);
     this.slabs.add(currentSlab);
     this.bytesAllocated = Math.addExact(this.bytesAllocated, nextSlabSize);
-    this.currentSlabIndex = 0;
   }
 
   @Override
@@ -201,9 +199,7 @@ public class CapacityByteArrayOutputStream extends 
OutputStream {
     if (!currentSlab.hasRemaining()) {
       addSlab(1);
     }
-    currentSlab.put(currentSlabIndex, (byte) b);
-    currentSlabIndex += 1;
-    currentSlab.position(currentSlabIndex);
+    currentSlab.put((byte) b);
     bytesUsed = Math.addExact(bytesUsed, 1);
   }
 
@@ -214,21 +210,16 @@ public class CapacityByteArrayOutputStream extends 
OutputStream {
       throw new IndexOutOfBoundsException(
           String.format("Given byte array of size %d, with requested 
length(%d) and offset(%d)", b.length, len, off));
     }
-    if (len >= currentSlab.remaining()) {
+    if (len > currentSlab.remaining()) {
       final int length1 = currentSlab.remaining();
       currentSlab.put(b, off, length1);
-      bytesUsed = Math.addExact(bytesUsed, length1);
-      currentSlabIndex += length1;
       final int length2 = len - length1;
       addSlab(length2);
       currentSlab.put(b, off + length1, length2);
-      currentSlabIndex = length2;
-      bytesUsed = Math.addExact(bytesUsed, length2);
     } else {
       currentSlab.put(b, off, len);
-      currentSlabIndex += len;
-      bytesUsed = Math.addExact(bytesUsed, len);
     }
+    bytesUsed = Math.addExact(bytesUsed, len);
   }
 
   private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws 
IOException {
@@ -252,10 +243,9 @@ public class CapacityByteArrayOutputStream extends 
OutputStream {
    * @exception  IOException  if an I/O error occurs.
    */
   public void writeTo(OutputStream out) throws IOException {
-    for (int i = 0; i < slabs.size() - 1; i++) {
-      writeToOutput(out, slabs.get(i), slabs.get(i).position());
+    for (ByteBuffer slab : slabs) {
+      writeToOutput(out, slab, slab.position());
     }
-    writeToOutput(out, currentSlab, currentSlabIndex);
   }
 
   /**
@@ -290,7 +280,6 @@ public class CapacityByteArrayOutputStream extends 
OutputStream {
     this.bytesAllocated = 0;
     this.bytesUsed = 0;
     this.currentSlab = EMPTY_SLAB;
-    this.currentSlabIndex = 0;
   }
 
   /**
diff --git 
a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
 
b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
index 89db1981f..b26f486fb 100644
--- 
a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
+++ 
b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
@@ -49,6 +49,35 @@ public class TestCapacityByteArrayOutputStream {
     validate(capacityByteArrayOutputStream, v * 3);
   }
 
+  @Test
+  public void testWriteArrayExpand() throws Throwable {
+    CapacityByteArrayOutputStream capacityByteArrayOutputStream = 
newCapacityBAOS(2);
+    assertEquals(0, capacityByteArrayOutputStream.getCapacity());
+
+    byte[] toWrite = {(byte) (1), (byte) (2), (byte) (3), (byte) (4)};
+    int toWriteOffset = 0;
+    int writeLength = 2;
+    // write 2 bytes array
+    capacityByteArrayOutputStream.write(toWrite, toWriteOffset, writeLength);
+    toWriteOffset += writeLength;
+    assertEquals(2, capacityByteArrayOutputStream.size());
+    assertEquals(2, capacityByteArrayOutputStream.getCapacity());
+
+    // write 1 byte array, expand capacity to 4
+    writeLength = 1;
+    capacityByteArrayOutputStream.write(toWrite, toWriteOffset, writeLength);
+    toWriteOffset += writeLength;
+    assertEquals(3, capacityByteArrayOutputStream.size());
+    assertEquals(4, capacityByteArrayOutputStream.getCapacity());
+
+    // write 1 byte array, not expand
+    capacityByteArrayOutputStream.write(toWrite, toWriteOffset, writeLength);
+    assertEquals(4, capacityByteArrayOutputStream.size());
+    assertEquals(4, capacityByteArrayOutputStream.getCapacity());
+    final byte[] byteArray = 
BytesInput.from(capacityByteArrayOutputStream).toByteArray();
+    assertArrayEquals(toWrite, byteArray);
+  }
+
   @Test
   public void testWriteArrayAndInt() throws Throwable {
     CapacityByteArrayOutputStream capacityByteArrayOutputStream = 
newCapacityBAOS(10);

Reply via email to