This is an automated email from the ASF dual-hosted git repository.

william pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 4bc2c5817 ORC-1463: [Java] Support Brotli codec
4bc2c5817 is described below

commit 4bc2c5817a36c459bc1c60b7521a773f0bd58075
Author: Deshan Xiao <[email protected]>
AuthorDate: Mon Jan 1 22:22:04 2024 -0800

    ORC-1463: [Java] Support Brotli codec
    
    ### What changes were proposed in this pull request?
    This PR is aimed to add Brotli codec defined by 
[RFC7932](https://www.ietf.org/rfc/rfc7932.txt).
    This is a new PR implemented based on 
[Brotli4j](https://github.com/hyperxpro/Brotli4j). Previous PR: 
https://github.com/apache/orc/pull/1565
    
    ### Why are the changes needed?
    To support more codec in ORC.
    
    ### How was this patch tested?
    UT and CI pipeline.
    
    Test brotli in example `CompressionWriter`. It will create a 10.2kb file 
while snappy produces a 44.2kb file.
    
    Closes #1714 from deshanxiao/orc-1463.
    
    Lead-authored-by: Deshan Xiao <[email protected]>
    Co-authored-by: deshanxiao <[email protected]>
    Signed-off-by: William Hyun <[email protected]>
---
 java/core/pom.xml                                  |   4 +
 .../src/java/org/apache/orc/CompressionKind.java   |   2 +-
 .../src/java/org/apache/orc/impl/BrotliCodec.java  | 206 +++++++++++++++++++++
 .../src/java/org/apache/orc/impl/ReaderImpl.java   |   1 +
 .../src/java/org/apache/orc/impl/WriterImpl.java   |   3 +
 .../src/test/org/apache/orc/impl/TestBrotli.java   | 155 ++++++++++++++++
 java/pom.xml                                       |   6 +
 7 files changed, 376 insertions(+), 1 deletion(-)

diff --git a/java/core/pom.xml b/java/core/pom.xml
index 3c6fc0f4e..a750911b7 100644
--- a/java/core/pom.xml
+++ b/java/core/pom.xml
@@ -76,6 +76,10 @@
       <groupId>org.threeten</groupId>
       <artifactId>threeten-extra</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.aayushatharva.brotli4j</groupId>
+      <artifactId>brotli4j</artifactId>
+    </dependency>
 
     <!-- test inter-project -->
     <dependency>
diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java 
b/java/core/src/java/org/apache/orc/CompressionKind.java
index 339529810..f5615acf8 100644
--- a/java/core/src/java/org/apache/orc/CompressionKind.java
+++ b/java/core/src/java/org/apache/orc/CompressionKind.java
@@ -23,5 +23,5 @@ package org.apache.orc;
  * can be applied to ORC files.
  */
 public enum CompressionKind {
-  NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD
+  NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD, BROTLI
 }
diff --git a/java/core/src/java/org/apache/orc/impl/BrotliCodec.java 
b/java/core/src/java/org/apache/orc/impl/BrotliCodec.java
new file mode 100644
index 000000000..6e45d1a5f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/BrotliCodec.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import com.aayushatharva.brotli4j.Brotli4jLoader;
+import com.aayushatharva.brotli4j.decoder.DecoderJNI;
+import com.aayushatharva.brotli4j.encoder.Encoder;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class BrotliCodec implements CompressionCodec, DirectDecompressionCodec 
{
+  // load jni library.
+  static {
+    Brotli4jLoader.ensureAvailability();
+  }
+
+  public BrotliCodec() {
+  }
+
+  static class BrotliOptions implements Options {
+
+    private Encoder.Mode mode = Encoder.Mode.GENERIC;
+    private int quality = -1;
+    private int lgwin = -1;
+
+    BrotliOptions() {
+
+    }
+
+    BrotliOptions(int quality, int lgwin, Encoder.Mode mode) {
+      this.quality = quality;
+      this.lgwin = lgwin;
+      this.mode = mode;
+    }
+
+    @Override
+    public Options copy() {
+      return new BrotliOptions(quality, lgwin, mode);
+    }
+
+    @Override
+    public Options setSpeed(SpeedModifier newValue) {
+      switch (newValue) {
+        case FAST:
+          // best speed + 1.
+          quality = 1;
+          break;
+        case DEFAULT:
+          // best quality. Keep default with default value.
+          quality = -1;
+          break;
+        case FASTEST:
+          // best speed.
+          quality = 0;
+          break;
+        default:
+          break;
+      }
+      return this;
+    }
+
+    @Override
+    public Options setData(DataKind newValue) {
+      switch (newValue) {
+        case BINARY:
+          mode = Encoder.Mode.GENERIC;
+          break;
+        case TEXT:
+          mode = Encoder.Mode.TEXT;
+          break;
+        default:
+          break;
+      }
+      return this;
+    }
+
+    public Encoder.Parameters brotli4jParameter() {
+      return new Encoder.Parameters()
+          .setQuality(quality).setWindow(lgwin).setMode(mode);
+    }
+  }
+
+  private static final BrotliCodec.BrotliOptions DEFAULT_OPTIONS = new 
BrotliOptions();
+
+  @Override
+  public Options getDefaultOptions() {
+    return DEFAULT_OPTIONS;
+  }
+
+  @Override
+  public boolean compress(
+      ByteBuffer in,
+      ByteBuffer out,
+      ByteBuffer overflow,
+      Options options) throws IOException {
+    BrotliOptions brotliOptions = (BrotliOptions) options;
+    int inBytes = in.remaining();
+    byte[] compressed = Encoder.compress(
+        in.array(), in.arrayOffset() + in.position(), inBytes, 
brotliOptions.brotli4jParameter());
+    int outBytes = compressed.length;
+    if (outBytes < inBytes) {
+      int remaining = out.remaining();
+      if (remaining >= outBytes) {
+        System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
+            out.position(), outBytes);
+        out.position(out.position() + outBytes);
+      } else {
+        System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
+            out.position(), remaining);
+        out.position(out.limit());
+        System.arraycopy(compressed, remaining, overflow.array(),
+            overflow.arrayOffset(), outBytes - remaining);
+        overflow.position(outBytes - remaining);
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
+    int compressedBytes = in.remaining();
+    DecoderJNI.Wrapper decoder = new DecoderJNI.Wrapper(compressedBytes);
+    try {
+      decoder.getInputBuffer().put(in);
+      decoder.push(compressedBytes);
+      while (decoder.getStatus() != DecoderJNI.Status.DONE) {
+        switch (decoder.getStatus()) {
+          case OK:
+            decoder.push(0);
+            break;
+
+          case NEEDS_MORE_OUTPUT:
+            ByteBuffer buffer = decoder.pull();
+            out.put(buffer);
+            break;
+
+          case NEEDS_MORE_INPUT:
+            // Give decoder a chance to process the remaining of the buffered 
byte.
+            decoder.push(0);
+            // If decoder still needs input, this means that stream is 
truncated.
+            if (decoder.getStatus() == DecoderJNI.Status.NEEDS_MORE_INPUT) {
+              return;
+            }
+            break;
+
+          default:
+            return;
+        }
+      }
+    } finally {
+      out.flip();
+      decoder.destroy();
+    }
+  }
+
+  @Override
+  public boolean isAvailable() {
+    return true;
+  }
+
+  @Override
+  public CompressionKind getKind() {
+    return CompressionKind.BROTLI;
+  }
+
+
+  @Override
+  public void directDecompress(ByteBuffer in, ByteBuffer out) throws 
IOException {
+    // decompress work well for both direct and heap.
+    decompress(in, out);
+  }
+
+  @Override
+  public void reset() {
+  }
+
+  @Override
+  public void destroy() {
+  }
+
+  @Override
+  public void close() {
+    OrcCodecPool.returnCodec(CompressionKind.BROTLI, this);
+  }
+}
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java 
b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index f2e150cea..b80094b58 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -654,6 +654,7 @@ public class ReaderImpl implements Reader {
       case LZO:
       case LZ4:
       case ZSTD:
+      case BROTLI:
         break;
       default:
         throw new IllegalArgumentException("Unknown compression");
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java 
b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index b5bb64387..9100aa374 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -290,6 +290,8 @@ public class WriterImpl implements WriterInternal, 
MemoryManager.Callback {
       case ZSTD:
         return new AircompressorCodec(kind, new ZstdCompressor(),
             new ZstdDecompressor());
+      case BROTLI:
+        return new BrotliCodec();
       default:
         throw new IllegalArgumentException("Unknown compression codec: " +
             kind);
@@ -579,6 +581,7 @@ public class WriterImpl implements WriterInternal, 
MemoryManager.Callback {
       case LZO: return OrcProto.CompressionKind.LZO;
       case LZ4: return OrcProto.CompressionKind.LZ4;
       case ZSTD: return OrcProto.CompressionKind.ZSTD;
+      case BROTLI: return OrcProto.CompressionKind.BROTLI;
       default:
         throw new IllegalArgumentException("Unknown compression " + kind);
     }
diff --git a/java/core/src/test/org/apache/orc/impl/TestBrotli.java 
b/java/core/src/test/org/apache/orc/impl/TestBrotli.java
new file mode 100644
index 000000000..e5d23ca45
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/TestBrotli.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class TestBrotli {
+  @Test
+  public void testOutputLargerThanBefore() {
+    ByteBuffer in = ByteBuffer.allocate(10);
+    ByteBuffer out = ByteBuffer.allocate(10);
+    in.put(new byte[]{1, 2, 3, 4, 5, 6, 7, 10});
+    in.flip();
+    try (BrotliCodec brotliCodec = new BrotliCodec()) {
+      // The compressed data length is larger than the original data.
+      assertFalse(brotliCodec.compress(in, out, null,
+          brotliCodec.getDefaultOptions()));
+    } catch (Exception e) {
+      fail(e);
+    }
+  }
+
+  @Test
+  public void testCompress() {
+    ByteBuffer in = ByteBuffer.allocate(10000);
+    ByteBuffer out = ByteBuffer.allocate(500);
+    ByteBuffer result = ByteBuffer.allocate(10000);
+    for (int i = 0; i < 10000; i++) {
+      in.put((byte) i);
+    }
+    in.flip();
+    try (BrotliCodec brotliCodec = new BrotliCodec()) {
+      assertTrue(brotliCodec.compress(in, out, null,
+          brotliCodec.getDefaultOptions()));
+      out.flip();
+      brotliCodec.decompress(out, result);
+      assertArrayEquals(result.array(), in.array());
+    } catch (Exception e) {
+      fail(e);
+    }
+  }
+
+  @Test
+  public void testCompressNotFromStart() {
+    ByteBuffer in = ByteBuffer.allocate(10000);
+    ByteBuffer out = ByteBuffer.allocate(10000);
+    ByteBuffer result = ByteBuffer.allocate(10000);
+    for (int i = 0; i < 10000; i++) {
+      in.put((byte) i);
+    }
+    in.flip();
+    in.get();
+
+    ByteBuffer slice = in.slice();
+    byte[] originalBytes = new byte[slice.remaining()];
+    slice.get(originalBytes);
+
+    try (BrotliCodec brotliCodec = new BrotliCodec()) {
+      // The compressed data length is larger than the original data.
+      assertTrue(brotliCodec.compress(in, out, null,
+          brotliCodec.getDefaultOptions()));
+
+      out.flip();
+      brotliCodec.decompress(out, result);
+
+      byte[] resultArray = new byte[result.remaining()];
+      result.get(resultArray);
+      assertArrayEquals(resultArray, originalBytes);
+    } catch (Exception e) {
+      fail(e);
+    }
+  }
+
+  @Test
+  public void testCompressWithOverflow() {
+    ByteBuffer in = ByteBuffer.allocate(10000);
+    ByteBuffer out = ByteBuffer.allocate(1);
+    ByteBuffer overflow = ByteBuffer.allocate(10000);
+    ByteBuffer result = ByteBuffer.allocate(10000);
+    for (int i = 0; i < 10000; i++) {
+      in.put((byte) i);
+    }
+    in.flip();
+    try (BrotliCodec brotliCodec = new BrotliCodec()) {
+      assertTrue(brotliCodec.compress(in, out, overflow,
+          brotliCodec.getDefaultOptions()));
+      out.flip();
+      overflow.flip();
+
+      // copy out, overflow to compressed
+      byte[] compressed = new byte[out.remaining() + overflow.remaining()];
+      System.arraycopy(out.array(), out.arrayOffset() + out.position(), 
compressed, 0, out.remaining());
+      System.arraycopy(overflow.array(), overflow.arrayOffset() + 
overflow.position(), compressed, out.remaining(), overflow.remaining());
+      // decompress compressedBuffer and check the result.
+      ByteBuffer compressedBuffer = ByteBuffer.allocate(compressed.length);
+      compressedBuffer.put(compressed);
+      compressedBuffer.flip();
+      brotliCodec.decompress(compressedBuffer, result);
+      assertArrayEquals(result.array(), in.array());
+    } catch (Exception e) {
+      fail(e);
+    }
+  }
+
+  @Test
+  public void testDirectDecompress() {
+    ByteBuffer in = ByteBuffer.allocate(10000);
+    ByteBuffer out = ByteBuffer.allocate(10000);
+    ByteBuffer directOut = ByteBuffer.allocateDirect(10000);
+    ByteBuffer directResult = ByteBuffer.allocateDirect(10000);
+    for (int i = 0; i < 10000; i++) {
+      in.put((byte) i);
+    }
+    in.flip();
+    try (BrotliCodec brotliCodec = new BrotliCodec()) {
+      // write bytes to heap buffer.
+      assertTrue(brotliCodec.compress(in, out, null,
+          brotliCodec.getDefaultOptions()));
+      out.flip();
+      // copy heap buffer to direct buffer.
+      directOut.put(out.array());
+      directOut.flip();
+
+      brotliCodec.decompress(directOut, directResult);
+
+      // copy result from direct buffer to heap.
+      byte[] heapBytes = new byte[in.array().length];
+      directResult.get(heapBytes, 0, directResult.limit());
+
+      assertArrayEquals(in.array(), heapBytes);
+    } catch (Exception e) {
+      fail(e);
+    }
+  }
+}
diff --git a/java/pom.xml b/java/pom.xml
index d597144f7..b3ef97772 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -60,6 +60,7 @@
   </modules>
 
   <properties>
+    <brotli4j.version>1.15.0</brotli4j.version>
     <checkstyle.version>10.12.0</checkstyle.version>
     <example.dir>${project.basedir}/../../examples</example.dir>
     <hadoop.version>3.3.6</hadoop.version>
@@ -202,6 +203,11 @@
         <artifactId>threeten-extra</artifactId>
         <version>1.7.1</version>
       </dependency>
+      <dependency>
+        <groupId>com.aayushatharva.brotli4j</groupId>
+        <artifactId>brotli4j</artifactId>
+        <version>${brotli4j.version}</version>
+      </dependency>
 
       <!-- test inter-project -->
       <dependency>

Reply via email to