liyafan82 commented on a change in pull request #9769: URL: https://github.com/apache/arrow/pull/9769#discussion_r601039085
########## File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java ########## @@ -38,109 +36,39 @@ /** * Compression codec for the LZ4 algorithm. */ -public class Lz4CompressionCodec implements CompressionCodec { +public class Lz4CompressionCodec extends AbstractCompressionCodec { - @Override - public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { - Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, - "The uncompressed buffer size exceeds the integer limit"); - - if (uncompressedBuffer.writerIndex() == 0L) { - // shortcut for empty buffer - ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); - compressedBuffer.setLong(0, 0); - compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); - uncompressedBuffer.close(); - return compressedBuffer; - } - - try { - ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); - long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; - if (compressedLength > uncompressedBuffer.writerIndex()) { - // compressed buffer is larger, send the raw buffer - compressedBuffer.close(); - compressedBuffer = CompressionUtil.packageRawBuffer(allocator, uncompressedBuffer); - } - - uncompressedBuffer.close(); - return compressedBuffer; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { + protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (InputStream in = new ByteArrayInputStream(inBytes); OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { IOUtils.copy(in, out); + } catch (IOException e) { + throw new RuntimeException(e); } byte[] outBytes = baos.toByteArray(); ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); - - long uncompressedLength = uncompressedBuffer.writerIndex(); - if (!MemoryUtil.LITTLE_ENDIAN) { - uncompressedLength = Long.reverseBytes(uncompressedLength); - } - // first 8 bytes reserved for uncompressed length, according to the specification - compressedBuffer.setLong(0, uncompressedLength); - PlatformDependent.copyMemory( outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); return compressedBuffer; } - @Override - public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { - Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, - "The compressed buffer size exceeds the integer limit"); - - Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, - "Not enough data to decompress."); - - long decompressedLength = compressedBuffer.getLong(0); - if (!MemoryUtil.LITTLE_ENDIAN) { - decompressedLength = Long.reverseBytes(decompressedLength); - } - - if (decompressedLength == 0L) { - // shortcut for empty buffer - compressedBuffer.close(); - return allocator.getEmpty(); - } - - if (decompressedLength == CompressionUtil.NO_COMPRESSION_LENGTH) { - // no compression - return CompressionUtil.extractUncompressedBuffer(compressedBuffer); - } - - try { - ArrowBuf decompressedBuffer = doDecompress(allocator, compressedBuffer); - compressedBuffer.close(); - return decompressedBuffer; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) throws IOException { - long decompressedLength = compressedBuffer.getLong(0); - if (!MemoryUtil.LITTLE_ENDIAN) { - decompressedLength = Long.reverseBytes(decompressedLength); - } + protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { Review comment: Revised. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org