martin-g commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r897835928
##########
arrow/src/ipc/reader.rs:
##########
@@ -34,13 +35,62 @@ use crate::record_batch::{RecordBatch, RecordBatchReader};
use ipc::CONTINUATION_MARKER;
use DataType::*;
+use crate::ipc::{CompressionType};
+use crate::ipc::compression::compression::CompressionCodecType;
+use crate::ipc::compression::{LENGTH_EMPTY_COMPRESSED_DATA,
LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA};
+
/// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From
https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(buf: &ipc::Buffer, a_data: &[u8], compression_codec:
&CompressionCodecType) -> Buffer {
let start_offset = buf.offset() as usize;
let end_offset = start_offset + buf.length() as usize;
let buf_data = &a_data[start_offset..end_offset];
- Buffer::from(&buf_data)
+ match compression_codec {
+ CompressionCodecType::NoCompression => {
+ Buffer::from(buf_data)
+ }
+ CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => {
+ // 8byte + data
+ // read the first 8 bytes
+ // if the data is compressed, decompress the data, otherwise
decompress data.
+ let decompressed_length = read_uncompressed_size(buf_data);
+ if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+ // emtpy
+ let empty = Vec::<u8>::new();
+ Buffer::from(empty)
+ } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+ // not compress
+ let data = &buf_data[(LENGTH_OF_PREFIX_DATA as
usize)..(end_offset - start_offset)];
+ Buffer::from(data)
+ } else {
+ // decompress data using the codec
+ let mut uncompressed_buffer = Vec::new();
+ let input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as
usize)..(end_offset - start_offset)];
Review Comment:
```suggestion
let input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as
usize)..];
```
##########
arrow/src/ipc/reader.rs:
##########
@@ -34,13 +35,62 @@ use crate::record_batch::{RecordBatch, RecordBatchReader};
use ipc::CONTINUATION_MARKER;
use DataType::*;
+use crate::ipc::{CompressionType};
+use crate::ipc::compression::compression::CompressionCodecType;
+use crate::ipc::compression::{LENGTH_EMPTY_COMPRESSED_DATA,
LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA};
+
/// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From
https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(buf: &ipc::Buffer, a_data: &[u8], compression_codec:
&CompressionCodecType) -> Buffer {
let start_offset = buf.offset() as usize;
let end_offset = start_offset + buf.length() as usize;
let buf_data = &a_data[start_offset..end_offset];
- Buffer::from(&buf_data)
+ match compression_codec {
+ CompressionCodecType::NoCompression => {
+ Buffer::from(buf_data)
+ }
+ CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => {
+ // 8byte + data
+ // read the first 8 bytes
+ // if the data is compressed, decompress the data, otherwise
decompress data.
Review Comment:
```suggestion
// if the data is compressed, decompress the data, otherwise
return as is
```
##########
arrow/src/ipc/writer.rs:
##########
@@ -37,6 +38,9 @@ use crate::record_batch::RecordBatch;
use crate::util::bit_util;
use ipc::CONTINUATION_MARKER;
+use crate::ipc::{BodyCompressionMethod, CompressionType};
+use crate::ipc::compression::compression::CompressionCodecType;
+use crate::ipc::compression::{LENGTH_EMPTY_COMPRESSED_DATA,
LENGTH_OF_PREFIX_DATA};
Review Comment:
```suggestion
use crate::ipc::compression::{LENGTH_EMPTY_COMPRESSED_DATA,
LENGTH_OF_PREFIX_DATA, LENGTH_NO_COMPRESSED_DATA};
```
##########
arrow/src/ipc/writer.rs:
##########
@@ -912,19 +945,64 @@ fn write_array_data(
}
/// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// From
https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
fn write_buffer(
buffer: &Buffer,
buffers: &mut Vec<ipc::Buffer>,
arrow_data: &mut Vec<u8>,
offset: i64,
+ compression_codec: &CompressionCodecType
) -> i64 {
- let len = buffer.len();
- let pad_len = pad_to_8(len as u32);
- let total_len: i64 = (len + pad_len) as i64;
+ let origin_buffer_len = buffer.len();
+ let mut compression_buffer = Vec::<u8>::new();
+ let (data, uncompression_buffer_len) = match compression_codec {
+ CompressionCodecType::NoCompression => {
+ // this buffer_len will not used in the following logic
+ // If we don't use the compression, just write the data in the
array
+ (buffer.as_slice(), origin_buffer_len as i64)
+ }
+ CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => {
+ if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA {
+ (buffer.as_slice(), 0)
+ } else {
+ compression_codec.compress(buffer.as_slice(), &mut
compression_buffer).unwrap();
+ if compression_buffer.len() > origin_buffer_len {
+ // the length of compressed data is larger than
uncompressed data
+ // use the uncompressed data with -1
+ // -1 indicate that we don't compress the data
+ (buffer.as_slice(), -1)
Review Comment:
```suggestion
(buffer.as_slice(), LENGTH_NO_COMPRESSED_DATA)
```
##########
arrow/src/ipc/reader.rs:
##########
@@ -34,13 +35,62 @@ use crate::record_batch::{RecordBatch, RecordBatchReader};
use ipc::CONTINUATION_MARKER;
use DataType::*;
+use crate::ipc::{CompressionType};
+use crate::ipc::compression::compression::CompressionCodecType;
+use crate::ipc::compression::{LENGTH_EMPTY_COMPRESSED_DATA,
LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA};
+
/// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From
https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(buf: &ipc::Buffer, a_data: &[u8], compression_codec:
&CompressionCodecType) -> Buffer {
let start_offset = buf.offset() as usize;
let end_offset = start_offset + buf.length() as usize;
let buf_data = &a_data[start_offset..end_offset];
- Buffer::from(&buf_data)
+ match compression_codec {
+ CompressionCodecType::NoCompression => {
+ Buffer::from(buf_data)
+ }
+ CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => {
+ // 8byte + data
+ // read the first 8 bytes
+ // if the data is compressed, decompress the data, otherwise
decompress data.
+ let decompressed_length = read_uncompressed_size(buf_data);
+ if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+ // emtpy
+ let empty = Vec::<u8>::new();
+ Buffer::from(empty)
+ } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+ // not compress
+ let data = &buf_data[(LENGTH_OF_PREFIX_DATA as
usize)..(end_offset - start_offset)];
Review Comment:
```suggestion
let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]