This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 22a06b4e7 ORC-998: Sharing compression output buffer among treeWriter: 
Refactoring within OutStream for portability
22a06b4e7 is described below

commit 22a06b4e7575f539be970ec9658f5b597ef7c83d
Author: mystic-lama <[email protected]>
AuthorDate: Sun Nov 19 19:50:54 2023 -0800

    ORC-998: Sharing compression output buffer among treeWriter: Refactoring 
within OutStream for portability
    
    ### What changes were proposed in this pull request?
    There's individual instance of OutStream within each TreeWriter created by 
WriterContext#createStream method. Within OutStream, there are totally 3 
buffers:
    
    current: the regular input buffer holding uncompressed, unencrypted bytes.
    compress: the output buffer holding compressed bytes
    overflow: same as "compress" but only used when the last compression output 
is larger than remaining capacity of compress buffer.
    Potentially the compress and overflow buffer don't have to be allocated 
individually within each OutStream object, but shared across all of them so to 
save memory allocation.
    
    This PR is the first step for sharing the compression output buffer, which 
refactors internal of OutStream and make the relevant object bundled together 
since they are logically related(and details of dealing with overflow doesn't 
have to be visible). This refactoring makes it easier to share the compression 
output buffer as a pass-in arguments in the follow-up PR.
    
    ### Why are the changes needed?
    For the context of 
[ORC-997](https://issues.apache.org/jira/browse/ORC-997), this change makes the 
compression output buffer, as a single entity, easier to be shared and passed 
in from caller.
    
    ### How was this patch tested?
    There's no functional changes from this PR so passing all existed unit 
tests. Also added a new test in TestOutStream to make sure that, the scenario 
where codec.compress() returns false (meaning the compression output is larger 
than original input) is also covered in the unit test.
    
    This closes #909
    
    Closes #1633 from mystic-lama/autumnust-refactor_compress_buffer.
    
    Lead-authored-by: mystic-lama <[email protected]>
    Co-authored-by: Lei Sun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../src/java/org/apache/orc/impl/OutStream.java    | 219 +++++++++++++--------
 .../test/org/apache/orc/impl/TestOutStream.java    |  51 +++++
 2 files changed, 192 insertions(+), 78 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java 
b/java/core/src/java/org/apache/orc/impl/OutStream.java
index d71b558b6..686f168a2 100644
--- a/java/core/src/java/org/apache/orc/impl/OutStream.java
+++ b/java/core/src/java/org/apache/orc/impl/OutStream.java
@@ -58,21 +58,10 @@ public class OutStream extends PositionedOutputStream {
   private ByteBuffer current = null;
 
   /**
-   * Stores the compressed bytes until we have a full buffer and then outputs
-   * them to the receiver. If no compression is being done, this (and overflow)
-   * will always be null and the current buffer will be sent directly to the
-   * receiver.
+   * Lazily initialized: Won't allocate byte buffer until invocation of init()
    */
-  private ByteBuffer compressed = null;
+  public OutputCompressedBuffer compressedBuffer = new 
OutputCompressedBuffer();
 
-  /**
-   * Since the compressed buffer may start with contents from previous
-   * compression blocks, we allocate an overflow buffer so that the
-   * output of the codec can be split between the two buffers. After the
-   * compressed buffer is sent to the receiver, the overflow buffer becomes
-   * the new compressed buffer.
-   */
-  private ByteBuffer overflow = null;
   private final int bufferSize;
   private final CompressionCodec codec;
   private final CompressionCodec.Options options;
@@ -223,13 +212,6 @@ public class OutStream extends PositionedOutputStream {
     }
   }
 
-  /**
-   * Allocate a new output buffer if we are compressing.
-   */
-  private ByteBuffer getNewOutputBuffer() {
-    return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
-  }
-
   private void flip() {
     current.limit(current.position());
     current.position(codec == null ? 0 : HEADER_SIZE);
@@ -266,7 +248,130 @@ public class OutStream extends PositionedOutputStream {
     }
   }
 
-  private void spill() throws java.io.IOException {
+
+  /**
+   * An abstraction over compressed buffer and the overflow associated with it.
+   * See comments for {@link #compressed} and {@link #overflow} for details.
+   */
+  private class OutputCompressedBuffer {
+    /**
+     * Stores the compressed bytes until we have a full buffer and then outputs
+     * them to the receiver. If no compression is being done, this (and 
overflow)
+     * will always be null and the current buffer will be sent directly to the
+     * receiver.
+     */
+    ByteBuffer compressed = null;
+
+    /**
+     * Since the compressed buffer may start with contents from previous
+     * compression blocks, we allocate an overflow buffer so that the
+     * output of the codec can be split between the two buffers. After the
+     * compressed buffer is sent to the receiver, the overflow buffer becomes
+     * the new compressed buffer.
+     */
+    ByteBuffer overflow = null;
+
+    public void init() {
+      if (compressed == null) {
+        compressed = getNewOutputBuffer();
+      } else if (overflow == null) {
+        overflow = getNewOutputBuffer();
+      }
+    }
+
+    public int getCurrentPosn() {
+      if (compressed != null) {
+        return compressed.position();
+      } else {
+        throw new IllegalStateException("Output Compression buffer not being 
init'ed properly");
+      }
+    }
+
+    public void advanceTo(int newPosn) {
+      compressed.position(newPosn);
+    }
+
+    public int getCapacity() {
+      int result = 0;
+
+      if (compressed != null) {
+        result += compressed.capacity();
+      }
+      if (overflow != null) {
+        result += overflow.capacity();
+      }
+
+      return result;
+    }
+
+    /**
+     * Commit the compression by
+     * 1) Writer header,
+     * 2) Checking if buffer is filled (so to be sent to
+     * {@link org.apache.orc.PhysicalWriter.OutputReceiver})and prepare for 
upcoming compression.
+     *
+     * @return the length of total compressed bytes.
+     */
+    public long commitCompress(int startPosn) throws IOException {
+      // find the total bytes in the chunk
+      int totalBytes = compressed.position() - startPosn - HEADER_SIZE;
+      if (overflow != null) {
+        totalBytes += overflow.position();
+      }
+      writeHeader(compressed, startPosn, totalBytes, false);
+      // if we have less than the next header left, spill it.
+      if (compressed.remaining() < HEADER_SIZE) {
+        compressed.flip();
+        outputBuffer(compressed);
+        compressed = overflow;
+        overflow = null;
+      }
+      return totalBytes + HEADER_SIZE;
+    }
+
+    public void abortCompress(int currentPosn) throws IOException {
+      // we are using the original, but need to spill the current
+      // compressed buffer first for ordering. So back up to where we started,
+      // flip it and add it to done.
+      if (currentPosn != 0) {
+        compressed.position(currentPosn);
+        compressed.flip();
+        outputBuffer(compressed);
+        compressed = null;
+        // if we have an overflow, clear it and make it the new compress
+        // buffer
+        if (overflow != null) {
+          overflow.clear();
+          compressed = overflow;
+          overflow = null;
+        }
+      } else {
+        compressed.clear();
+        if (overflow != null) {
+          overflow.clear();
+        }
+      }
+    }
+
+    public void reset() throws IOException {
+      if (compressed != null && compressed.position() != 0) {
+        compressed.flip();
+        outputBuffer(compressed);
+      }
+
+      compressed = null;
+      overflow = null;
+    }
+
+    /**
+     * Allocate a new output buffer if we are compressing.
+     */
+    private ByteBuffer getNewOutputBuffer() {
+      return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
+    }
+  }
+
+  private void spill() throws IOException {
     // if there isn't anything in the current buffer, don't spill
     if (current == null ||
         current.position() == (codec == null ? 0 : HEADER_SIZE)) {
@@ -277,56 +382,25 @@ public class OutStream extends PositionedOutputStream {
       outputBuffer(current);
       getNewInputBuffer();
     } else {
-      if (compressed == null) {
-        compressed = getNewOutputBuffer();
-      } else if (overflow == null) {
-        overflow = getNewOutputBuffer();
-      }
-      int sizePosn = compressed.position();
-      compressed.position(compressed.position() + HEADER_SIZE);
-      if (codec.compress(current, compressed, overflow, options)) {
-        uncompressedBytes = 0;
+      compressedBuffer.init();
+      int currentPosn = compressedBuffer.getCurrentPosn();
+      compressedBuffer.advanceTo(currentPosn + HEADER_SIZE);
+
+      // Worth compression
+      if (codec.compress(current, compressedBuffer.compressed,
+          compressedBuffer.overflow, options)) {
         // move position back to after the header
+        uncompressedBytes = 0;
+
         current.position(HEADER_SIZE);
         current.limit(current.capacity());
-        // find the total bytes in the chunk
-        int totalBytes = compressed.position() - sizePosn - HEADER_SIZE;
-        if (overflow != null) {
-          totalBytes += overflow.position();
-        }
-        compressedBytes += totalBytes + HEADER_SIZE;
-        writeHeader(compressed, sizePosn, totalBytes, false);
-        // if we have less than the next header left, spill it.
-        if (compressed.remaining() < HEADER_SIZE) {
-          compressed.flip();
-          outputBuffer(compressed);
-          compressed = overflow;
-          overflow = null;
-        }
+
+        compressedBytes += compressedBuffer.commitCompress(currentPosn);
       } else {
         compressedBytes += uncompressedBytes + HEADER_SIZE;
         uncompressedBytes = 0;
-        // we are using the original, but need to spill the current
-        // compressed buffer first. So back up to where we started,
-        // flip it and add it to done.
-        if (sizePosn != 0) {
-          compressed.position(sizePosn);
-          compressed.flip();
-          outputBuffer(compressed);
-          compressed = null;
-          // if we have an overflow, clear it and make it the new compress
-          // buffer
-          if (overflow != null) {
-            overflow.clear();
-            compressed = overflow;
-            overflow = null;
-          }
-        } else {
-          compressed.clear();
-          if (overflow != null) {
-            overflow.clear();
-          }
-        }
+
+        compressedBuffer.abortCompress(currentPosn);
 
         // now add the current buffer into the done list and get a new one.
         current.position(0);
@@ -351,17 +425,12 @@ public class OutStream extends PositionedOutputStream {
   @Override
   public void flush() throws IOException {
     spill();
-    if (compressed != null && compressed.position() != 0) {
-      compressed.flip();
-      outputBuffer(compressed);
-    }
+    compressedBuffer.reset();
     if (cipher != null) {
       finishEncryption();
     }
-    compressed = null;
     uncompressedBytes = 0;
     compressedBytes = 0;
-    overflow = null;
     current = null;
   }
 
@@ -379,13 +448,7 @@ public class OutStream extends PositionedOutputStream {
       if (current != null) {
         result += current.capacity();
       }
-      if (compressed != null) {
-        result += compressed.capacity();
-      }
-      if (overflow != null) {
-        result += overflow.capacity();
-      }
-      return result + compressedBytes;
+      return result + compressedBuffer.getCapacity() + compressedBytes;
     }
   }
 
diff --git a/java/core/src/test/org/apache/orc/impl/TestOutStream.java 
b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
index a67331464..aa3e9ec4f 100644
--- a/java/core/src/test/org/apache/orc/impl/TestOutStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
@@ -35,6 +35,7 @@ import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.security.Key;
+import java.util.Random;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -59,6 +60,56 @@ public class TestOutStream {
     }
   }
 
+  /**
+   * Creates randomness into whether a compression should be committed or 
aborted (so that
+   * the isOriginal bits is set to true and data being flushed as 
uncompressed).
+   * This class should be used for testing purpose only.
+   */
+  private static class TestZlibCodec extends ZlibCodec {
+    private Random rand = new Random();
+
+    @Override
+    public boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer 
overflow, Options options) {
+      super.compress(in, out, overflow, options);
+      return rand.nextBoolean();
+    }
+  }
+
+  @Test
+  public void testCompressWithoutEncryption() throws Exception {
+    TestInStream.OutputCollector receiver = new TestInStream.OutputCollector();
+    CompressionCodec codec = new TestZlibCodec();
+    StreamOptions options = new StreamOptions(1024)
+        .withCodec(codec, codec.getDefaultOptions());
+
+    try (OutStream stream = new OutStream("test", options, receiver)) {
+      for (int i = 0; i < 20000; ++i) {
+        stream.write(("The Cheesy Poofs " + i + "\n")
+            .getBytes(StandardCharsets.UTF_8));
+      }
+      stream.flush();
+    }
+
+    byte[] compressed = receiver.buffer.get();
+
+    // use InStream to decompress it
+    BufferChunkList ranges = new BufferChunkList();
+    ranges.add(new BufferChunk(ByteBuffer.wrap(compressed), 0));
+    try (InStream decompressedStream = InStream.create("test", ranges.get(), 0,
+        compressed.length,
+        InStream.options().withCodec(new 
TestZlibCodec()).withBufferSize(1024));
+        BufferedReader reader
+            = new BufferedReader(new InputStreamReader(decompressedStream,
+            StandardCharsets.UTF_8))) {
+      // check the contents of the decompressed stream
+      for (int i = 0; i < 20000; ++i) {
+        assertEquals("The Cheesy Poofs " + i, reader.readLine(), "i = " + i);
+      }
+      assertNull(reader.readLine());
+    }
+
+  }
+
   @Test
   public void testAssertBufferSizeValid() {
     try {

Reply via email to