tustvold commented on code in PR #2369: URL: https://github.com/apache/arrow-rs/pull/2369#discussion_r945124334
########## arrow/src/ipc/compression/codec.rs: ########## @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::buffer::Buffer; +use crate::error::{ArrowError, Result}; +use crate::ipc::CompressionType; +use std::io::{Read, Write}; + +const LENGTH_NO_COMPRESSED_DATA: i64 = -1; +const LENGTH_OF_PREFIX_DATA: i64 = 8; + +#[derive(Debug, Clone, Copy, PartialEq)] +/// Represents compressing a ipc stream using a particular compression algorithm +pub enum CompressionCodec { + Lz4Frame, + Zstd, +} + +impl TryFrom<CompressionType> for CompressionCodec { + type Error = ArrowError; + + fn try_from(compression_type: CompressionType) -> Result<Self> { + match compression_type { + CompressionType::ZSTD => Ok(CompressionCodec::Zstd), + CompressionType::LZ4_FRAME => Ok(CompressionCodec::Lz4Frame), + other_type => Err(ArrowError::NotYetImplemented(format!( + "compression type {:?} not supported ", + other_type + ))), + } + } +} + +impl CompressionCodec { + /// Compresses the data in `input` to `output` and appends the + /// data using the specified compression mechanism. + /// + /// returns the number of bytes written to the stream + /// + /// Writes this format to output: + /// ```text + /// [8 bytes]: uncompressed length + /// [reminaing bytes]: compressed data stream + /// ``` + pub(crate) fn compress_to_vec( + &self, + input: &[u8], + output: &mut Vec<u8>, + ) -> Result<usize> { + let uncompressed_data_len = input.len(); + let original_output_len = output.len(); + + if uncompressed_data_len == 0 { + // empty input, nothing to do + } else { + // write compressed data directly into the output buffer + output.extend_from_slice(&uncompressed_data_len.to_le_bytes()); + self.compress(input, output)?; + + let compression_len = output.len(); + if compression_len > uncompressed_data_len { + // length of compressed data was larger than + // uncompressed data, use the uncompressed data with + // length -1 to indicate that we don't compress the + // data + output.truncate(original_output_len); + output.extend_from_slice(&LENGTH_NO_COMPRESSED_DATA.to_le_bytes()); + output.extend_from_slice(input); + } + } + Ok(output.len() - original_output_len) + } + + /// Decompresses the input into a [`Buffer`] + /// + /// The input should look like: + /// ```text + /// [8 bytes]: uncompressed length + /// [reminaing bytes]: compressed data stream + /// ``` + pub(crate) fn decompress_to_buffer(&self, input: &[u8]) -> Result<Buffer> { + // read the first 8 bytes to determine if the data is + // compressed + let decompressed_length = read_uncompressed_size(input); + let buffer = if decompressed_length == 0 { + // emtpy + let empty = Vec::<u8>::new(); + Buffer::from(empty) + } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA { + // no compression + let data = &input[(LENGTH_OF_PREFIX_DATA as usize)..]; + Buffer::from(data) Review Comment: Yeah... Not important for this PR, but it seems unfortunate the amount of memory copying we are doing, especially when the major design goal of the IPC spec is to avoid this :sweat_smile: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
