emkornfield commented on a change in pull request #8949: URL: https://github.com/apache/arrow/pull/8949#discussion_r595777904
########## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; Review comment: ahh this is where netty is used. we don't have an arrow wrapper for it? ########## File path: java/compression/pom.xml ########## @@ -0,0 +1,56 @@ +<?xml version="1.0"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-java-root</artifactId> + <version>4.0.0-SNAPSHOT</version> + </parent> + <artifactId>arrow-compression</artifactId> + <name>Arrow Compression</name> + <description>(Experimental/Contrib) A library for working with the compression/decompression of Arrow data.</description> + + <dependencies> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-format</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + <version>${project.version}</version> + <classifier>${arrow.vector.classifier}</classifier> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-unsafe</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <version>1.20</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> Review comment: hmm, wonder why netty is required here though, I'll take a closer look. ########## File path: java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java ########## @@ -47,14 +57,21 @@ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) } /** - * Creates the {@link CompressionCodec} given the compression type. + * Process compression by compressing the buffer as is. */ - public static CompressionCodec createCodec(byte compressionType) { - switch (compressionType) { - case NoCompressionCodec.COMPRESSION_TYPE: - return NoCompressionCodec.INSTANCE; - default: - throw new IllegalArgumentException("Compression type not supported: " + compressionType); - } + public static ArrowBuf compressRawBuffer(BufferAllocator allocator, ArrowBuf inputBuffer) { + ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex()); + compressedBuffer.setLong(0, NO_COMPRESSION_LENGTH); + compressedBuffer.setBytes(SIZE_OF_UNCOMPRESSED_LENGTH, inputBuffer, 0, inputBuffer.writerIndex()); + compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex()); + return compressedBuffer; + } + + /** + * Process decompression by decompressing the buffer as is. Review comment: please update the docs to match, something like. "Slice the buffer to contain the uncompressed bytes" ########## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { + Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The uncompressed buffer size exceeds the integer limit"); + + if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; + } + + try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { + // compressed buffer is larger, send the raw buffer + compressedBuffer.close(); + compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { + byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; + PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); + } + + byte[] outBytes = baos.toByteArray(); + + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + + long uncompressedLength = uncompressedBuffer.writerIndex(); + if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); + } + // first 8 bytes reserved for uncompressed length, to be consistent with the + // C++ implementation. + compressedBuffer.setLong(0, uncompressedLength); + + PlatformDependent.copyMemory( + outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { + Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The compressed buffer size exceeds the integer limit"); + + Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, + "Not enough data to decompress."); + + long decompressedLength = compressedBuffer.getLong(0); + if (!MemoryUtil.LITTLE_ENDIAN) { + decompressedLength = Long.reverseBytes(decompressedLength); + } + + if (decompressedLength == 0L) { + // shortcut for empty buffer + compressedBuffer.close(); + return allocator.getEmpty(); + } + + if (decompressedLength == CompressionUtil.NO_COMPRESSION_LENGTH) { + // no compression + return CompressionUtil.decompressRawBuffer(compressedBuffer); + } + + try { + ArrowBuf decompressedBuffer = doDecompress(allocator, compressedBuffer); + compressedBuffer.close(); + return decompressedBuffer; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) throws IOException { + long decompressedLength = compressedBuffer.getLong(0); + if (!MemoryUtil.LITTLE_ENDIAN) { + decompressedLength = Long.reverseBytes(decompressedLength); + } + + byte[] inBytes = new byte[(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)]; + PlatformDependent.copyMemory( + compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes, 0, inBytes.length); + ByteArrayOutputStream out = new ByteArrayOutputStream((int) decompressedLength); + try (InputStream in = new FramedLZ4CompressorInputStream(new ByteArrayInputStream(inBytes))) { + IOUtils.copy(in, out); + } + + byte[] outBytes = out.toByteArray(); + ArrowBuf decompressedBuffer = allocator.buffer(outBytes.length); + PlatformDependent.copyMemory(outBytes, 0, decompressedBuffer.memoryAddress(), outBytes.length); + decompressedBuffer.writerIndex(decompressedLength); + return decompressedBuffer; + } + + @Override + public String getCodecName() { + return CompressionType.name(CompressionType.LZ4_FRAME); Review comment: With the new enum, maybe we can make this an accessor that returns and enum instead? and then the byte can be extracted from there where necesssary? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org