pitrou commented on a change in pull request #9680:
URL: https://github.com/apache/arrow/pull/9680#discussion_r594299141
##########
File path: cpp/src/arrow/util/compression_test.cc
##########
@@ -519,6 +519,44 @@ TEST_P(CodecTest, StreamingDecompressorReuse) {
CheckStreamingRoundtrip(compressor, decompressor, data);
}
+TEST_P(CodecTest, StreamingMultiFlush) {
+ // Regression test for ARROW-11937
+ if (GetCompression() == Compression::SNAPPY) {
+ GTEST_SKIP() << "snappy doesn't support streaming decompression";
+ }
+ if (GetCompression() == Compression::LZ4 ||
+ GetCompression() == Compression::LZ4_HADOOP) {
+ GTEST_SKIP() << "LZ4 raw format doesn't support streaming decompression.";
+ }
+ auto type = GetCompression();
+ ASSERT_OK_AND_ASSIGN(auto codec, Codec::Create(type));
+
+ std::shared_ptr<Compressor> compressor;
+ ASSERT_OK_AND_ASSIGN(compressor, codec->MakeCompressor());
+
+ // Grow the buffer and flush again while requested (up to a bounded number
of times)
+ std::vector<uint8_t> compressed(1024);
+ Compressor::FlushResult result;
+ int attempts = 0;
+ int64_t actual_size = 0;
+ int64_t output_len = 0;
+ uint8_t* output = compressed.data();
+ do {
+ compressed.resize(compressed.capacity() * 2);
+ output_len = compressed.size() - actual_size;
+ output = compressed.data() + actual_size;
+ ASSERT_OK_AND_ASSIGN(result, compressor->Flush(output_len, output));
+ actual_size += result.bytes_written;
+ attempts++;
+ } while (attempts < 16 && result.should_retry);
Review comment:
I'm not sure why it's useful to attempt 16 times. The compressor
basically didn't compress anything, did it?
##########
File path: cpp/src/arrow/util/compression_zlib.cc
##########
@@ -246,7 +246,7 @@ class GZipCompressor : public Compressor {
// again with the same value of the flush parameter and more output space
// (updated avail_out), until the flush is complete (deflate returns
// with non-zero avail_out)."
- return FlushResult{bytes_written, (bytes_written == 0)};
+ return FlushResult{bytes_written, (stream_.avail_out == 0)};
Review comment:
Should we also check for `Z_BUF_ERROR` here?
##########
File path: cpp/src/arrow/util/compression_test.cc
##########
@@ -519,6 +519,44 @@ TEST_P(CodecTest, StreamingDecompressorReuse) {
CheckStreamingRoundtrip(compressor, decompressor, data);
}
+TEST_P(CodecTest, StreamingMultiFlush) {
+ // Regression test for ARROW-11937
+ if (GetCompression() == Compression::SNAPPY) {
+ GTEST_SKIP() << "snappy doesn't support streaming decompression";
+ }
+ if (GetCompression() == Compression::LZ4 ||
+ GetCompression() == Compression::LZ4_HADOOP) {
+ GTEST_SKIP() << "LZ4 raw format doesn't support streaming decompression.";
+ }
+ auto type = GetCompression();
+ ASSERT_OK_AND_ASSIGN(auto codec, Codec::Create(type));
+
+ std::shared_ptr<Compressor> compressor;
+ ASSERT_OK_AND_ASSIGN(compressor, codec->MakeCompressor());
+
+ // Grow the buffer and flush again while requested (up to a bounded number
of times)
+ std::vector<uint8_t> compressed(1024);
+ Compressor::FlushResult result;
+ int attempts = 0;
+ int64_t actual_size = 0;
+ int64_t output_len = 0;
+ uint8_t* output = compressed.data();
+ do {
+ compressed.resize(compressed.capacity() * 2);
+ output_len = compressed.size() - actual_size;
+ output = compressed.data() + actual_size;
+ ASSERT_OK_AND_ASSIGN(result, compressor->Flush(output_len, output));
+ actual_size += result.bytes_written;
+ attempts++;
+ } while (attempts < 16 && result.should_retry);
Review comment:
I mean `attempts < 2` would sound sufficient.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]