pitrou commented on code in PR #15194:
URL: https://github.com/apache/arrow/pull/15194#discussion_r1087954520
##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -457,17 +457,27 @@ Result<std::shared_ptr<Buffer>> DecompressBuffer(const
std::shared_ptr<Buffer>&
int64_t compressed_size = buf->size() - sizeof(int64_t);
int64_t uncompressed_size =
bit_util::FromLittleEndian(util::SafeLoadAs<int64_t>(data));
+ bool is_compressed = uncompressed_size != -1;
+ if (!is_compressed) {
+ uncompressed_size = compressed_size;
+ }
+
ARROW_ASSIGN_OR_RAISE(auto uncompressed,
AllocateBuffer(uncompressed_size,
options.memory_pool));
- ARROW_ASSIGN_OR_RAISE(
- int64_t actual_decompressed,
- codec->Decompress(compressed_size, data + sizeof(int64_t),
uncompressed_size,
- uncompressed->mutable_data()));
- if (actual_decompressed != uncompressed_size) {
- return Status::Invalid("Failed to fully decompress buffer, expected ",
- uncompressed_size, " bytes but decompressed ",
- actual_decompressed);
+ if (is_compressed) {
+ ARROW_ASSIGN_OR_RAISE(
+ int64_t actual_decompressed,
+ codec->Decompress(compressed_size, data + sizeof(int64_t),
uncompressed_size,
+ uncompressed->mutable_data()));
+ if (actual_decompressed != uncompressed_size) {
+ return Status::Invalid("Failed to fully decompress buffer, expected ",
+ uncompressed_size, " bytes but decompressed ",
+ actual_decompressed);
+ }
+ } else {
+ std::memcpy(uncompressed->mutable_data(), data + sizeof(int64_t),
+ static_cast<size_t>(uncompressed_size));
Review Comment:
Hmm... instead of allocating a new buffer and copying the data, why not just
slice `buf`?
##########
cpp/src/arrow/ipc/writer.cc:
##########
@@ -176,19 +176,43 @@ class RecordBatchSerializer {
field_nodes_, buffer_meta_, options_,
&out_->metadata);
}
+ bool ShouldCompress(int64_t uncompressed_size, int64_t compressed_size)
const {
+ auto max_compressed_size = static_cast<int64_t>(
+ std::floor((1.0 - options_.min_space_savings) * uncompressed_size));
+ return compressed_size <= max_compressed_size;
+ }
+
Status CompressBuffer(const Buffer& buffer, util::Codec* codec,
std::shared_ptr<Buffer>* out) {
- // Convert buffer to uncompressed-length-prefixed compressed buffer
+ // Convert buffer to uncompressed-length-prefixed buffer. The actual body
may or may
+ // not be compressed, depending on user-preference and projected size
reduction.
int64_t maximum_length = codec->MaxCompressedLen(buffer.size(),
buffer.data());
- ARROW_ASSIGN_OR_RAISE(auto result, AllocateBuffer(maximum_length +
sizeof(int64_t)));
+ int64_t prefixed_length = buffer.size();
- int64_t actual_length;
- ARROW_ASSIGN_OR_RAISE(actual_length,
+ ARROW_ASSIGN_OR_RAISE(auto result,
+ AllocateResizableBuffer(maximum_length +
sizeof(int64_t)));
+ ARROW_ASSIGN_OR_RAISE(auto actual_length,
codec->Compress(buffer.size(), buffer.data(),
maximum_length,
result->mutable_data() +
sizeof(int64_t)));
+ // FIXME: Not the most sophisticated way to handle this. Ideally, you'd
want to avoid
+ // pre-compressing the entire buffer via some kind of sampling method. As
the feature
+ // gains adoption, this may become a worthwhile optimization.
+ if (!ShouldCompress(buffer.size(), actual_length)) {
+ if (buffer.size() < actual_length || buffer.size() > maximum_length) {
Review Comment:
I'm not sure why `buffer.size() < actual_length` if you're passing
`/*shrink_to_fit=*/false` below?
##########
cpp/src/arrow/ipc/writer.cc:
##########
@@ -176,19 +176,43 @@ class RecordBatchSerializer {
field_nodes_, buffer_meta_, options_,
&out_->metadata);
}
+ bool ShouldCompress(int64_t uncompressed_size, int64_t compressed_size)
const {
+ auto max_compressed_size = static_cast<int64_t>(
+ std::floor((1.0 - options_.min_space_savings) * uncompressed_size));
+ return compressed_size <= max_compressed_size;
Review Comment:
In extreme cases, `compressed_size` can be larger than `uncompressed_size`
(because compression always has some kind of fixed overhead). But if
`min_space_savings` is 0, we still want to compress to avoid compatibility
issues.
##########
cpp/src/arrow/ipc/options.h:
##########
@@ -67,6 +67,16 @@ struct ARROW_EXPORT IpcWriteOptions {
/// May only be UNCOMPRESSED, LZ4_FRAME and ZSTD.
std::shared_ptr<util::Codec> codec;
+ /// @brief Minimum space savings percentage required for compression to be
applied
+ ///
+ /// Space savings is calculated as (1.0 - compressed_size /
uncompressed_size).
+ ///
+ /// For example, if min_space_savings = 0.1, a 100-byte body buffer won't
undergo
+ /// compression if its expected compressed size exceeds 90 bytes. If this
option isn't
+ /// set, compression will be used indiscriminately. If no codec was
supplied, this
+ /// option is ignored.
Review Comment:
Also, can you add that enabling non-zero savings can make the data
unreadable for pre-12.0.0 Arrow C++ versions?
##########
cpp/src/arrow/ipc/writer.cc:
##########
@@ -176,19 +176,43 @@ class RecordBatchSerializer {
field_nodes_, buffer_meta_, options_,
&out_->metadata);
}
+ bool ShouldCompress(int64_t uncompressed_size, int64_t compressed_size)
const {
+ auto max_compressed_size = static_cast<int64_t>(
+ std::floor((1.0 - options_.min_space_savings) * uncompressed_size));
+ return compressed_size <= max_compressed_size;
+ }
+
Status CompressBuffer(const Buffer& buffer, util::Codec* codec,
std::shared_ptr<Buffer>* out) {
- // Convert buffer to uncompressed-length-prefixed compressed buffer
+ // Convert buffer to uncompressed-length-prefixed buffer. The actual body
may or may
+ // not be compressed, depending on user-preference and projected size
reduction.
int64_t maximum_length = codec->MaxCompressedLen(buffer.size(),
buffer.data());
- ARROW_ASSIGN_OR_RAISE(auto result, AllocateBuffer(maximum_length +
sizeof(int64_t)));
+ int64_t prefixed_length = buffer.size();
- int64_t actual_length;
- ARROW_ASSIGN_OR_RAISE(actual_length,
+ ARROW_ASSIGN_OR_RAISE(auto result,
+ AllocateResizableBuffer(maximum_length +
sizeof(int64_t)));
+ ARROW_ASSIGN_OR_RAISE(auto actual_length,
codec->Compress(buffer.size(), buffer.data(),
maximum_length,
result->mutable_data() +
sizeof(int64_t)));
+ // FIXME: Not the most sophisticated way to handle this. Ideally, you'd
want to avoid
+ // pre-compressing the entire buffer via some kind of sampling method. As
the feature
+ // gains adoption, this may become a worthwhile optimization.
+ if (!ShouldCompress(buffer.size(), actual_length)) {
+ if (buffer.size() < actual_length || buffer.size() > maximum_length) {
Review Comment:
(though it's probably harmless)
##########
cpp/src/arrow/ipc/read_write_test.cc:
##########
@@ -720,6 +771,11 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) {
write_options.use_threads = false;
read_options.use_threads = false;
CheckRoundtrip(*batch, write_options, read_options);
+
+ ASSERT_OK_AND_ASSIGN(write_options.codec, MockCodec::Create(codec));
+ // 200% savings is impossible, so compression/decompression should be
skipped
+ write_options.min_space_savings = 2.0;
Review Comment:
A more interesting check would be a `min_space_savings` between 0 and 1, but
without random (uncompressible) data, to check that compression isn't enabled.
##########
cpp/src/arrow/ipc/writer.cc:
##########
@@ -176,19 +176,43 @@ class RecordBatchSerializer {
field_nodes_, buffer_meta_, options_,
&out_->metadata);
}
+ bool ShouldCompress(int64_t uncompressed_size, int64_t compressed_size)
const {
+ auto max_compressed_size = static_cast<int64_t>(
+ std::floor((1.0 - options_.min_space_savings) * uncompressed_size));
+ return compressed_size <= max_compressed_size;
+ }
+
Status CompressBuffer(const Buffer& buffer, util::Codec* codec,
std::shared_ptr<Buffer>* out) {
- // Convert buffer to uncompressed-length-prefixed compressed buffer
+ // Convert buffer to uncompressed-length-prefixed buffer. The actual body
may or may
+ // not be compressed, depending on user-preference and projected size
reduction.
int64_t maximum_length = codec->MaxCompressedLen(buffer.size(),
buffer.data());
- ARROW_ASSIGN_OR_RAISE(auto result, AllocateBuffer(maximum_length +
sizeof(int64_t)));
+ int64_t prefixed_length = buffer.size();
- int64_t actual_length;
- ARROW_ASSIGN_OR_RAISE(actual_length,
+ ARROW_ASSIGN_OR_RAISE(auto result,
+ AllocateResizableBuffer(maximum_length +
sizeof(int64_t)));
+ ARROW_ASSIGN_OR_RAISE(auto actual_length,
codec->Compress(buffer.size(), buffer.data(),
maximum_length,
result->mutable_data() +
sizeof(int64_t)));
+ // FIXME: Not the most sophisticated way to handle this. Ideally, you'd
want to avoid
+ // pre-compressing the entire buffer via some kind of sampling method. As
the feature
+ // gains adoption, this may become a worthwhile optimization.
Review Comment:
I've opened GH-33885 for this, which you can reference here.
##########
cpp/src/arrow/ipc/read_write_test.cc:
##########
@@ -720,6 +771,11 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) {
write_options.use_threads = false;
read_options.use_threads = false;
CheckRoundtrip(*batch, write_options, read_options);
+
+ ASSERT_OK_AND_ASSIGN(write_options.codec, MockCodec::Create(codec));
+ // 200% savings is impossible, so compression/decompression should be
skipped
+ write_options.min_space_savings = 2.0;
Review Comment:
(something like `min_space_savings = 2.0` should perhaps raise an error?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]