This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new eab7d5f ARROW-1019: [C++] Implement compressed streams
eab7d5f is described below
commit eab7d5f1dc5eecdd0f52176de0508a8efcb72332
Author: Antoine Pitrou <[email protected]>
AuthorDate: Thu Oct 18 15:34:35 2018 -0400
ARROW-1019: [C++] Implement compressed streams
Implement `CompressedInputStream` and `CompressedOutputStream` C++ classes.
Tested with gzip, brotli and zstd codecs.
I initially intended to expose the functionality in Python, but
`NativeFile` expects a `RandomAccessFile` in read mode (rather than a mere
`InputStream`).
Author: Antoine Pitrou <[email protected]>
Closes #2777 from pitrou/ARROW-1019-compressed-streams and squashes the
following commits:
64461f3e5 <Antoine Pitrou> Address review comments
0efd75ffb <Antoine Pitrou> Add tests for truncated and invalid data
77bce461f <Antoine Pitrou> Add Flush() tests
4066f2347 <Antoine Pitrou> Allow passing an explicit MemoryPool
9011a5d6e <Antoine Pitrou> Remove debug prints
d5582ca30 <Antoine Pitrou> Reduce test duration with Valgrind
9cde69bfd <Antoine Pitrou> ARROW-1019: Implement compressed streams
---
cpp/src/arrow/CMakeLists.txt | 1 +
cpp/src/arrow/io/CMakeLists.txt | 2 +
cpp/src/arrow/io/compressed.cc | 399 +++++++++++++++++++++++++++++++++
cpp/src/arrow/io/compressed.h | 113 ++++++++++
cpp/src/arrow/io/io-compressed-test.cc | 243 ++++++++++++++++++++
5 files changed, 758 insertions(+)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 290b3c9..9a2f3a4 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -38,6 +38,7 @@ set(ARROW_SRCS
csv/reader.cc
io/buffered.cc
+ io/compressed.cc
io/file.cc
io/interfaces.cc
io/memory.cc
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index ff6b854..d21bb16 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -19,6 +19,7 @@
# arrow_io : Arrow IO interfaces
ADD_ARROW_TEST(io-buffered-test)
+ADD_ARROW_TEST(io-compressed-test)
ADD_ARROW_TEST(io-file-test)
if (ARROW_HDFS AND NOT ARROW_BOOST_HEADER_ONLY)
@@ -35,6 +36,7 @@ ADD_ARROW_BENCHMARK(io-memory-benchmark)
install(FILES
api.h
buffered.h
+ compressed.h
file.h
hdfs.h
interfaces.h
diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc
new file mode 100644
index 0000000..b0aabcb
--- /dev/null
+++ b/cpp/src/arrow/io/compressed.cc
@@ -0,0 +1,399 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/io/compressed.h"
+
+#include <algorithm>
+#include <cstring>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <utility>
+
+#include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
+#include "arrow/status.h"
+#include "arrow/util/compression.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+using util::Codec;
+using util::Compressor;
+using util::Decompressor;
+
+namespace io {
+
+// ----------------------------------------------------------------------
+// CompressedOutputStream implementation
+
+class CompressedOutputStream::Impl {
+ public:
+ Impl(MemoryPool* pool, Codec* codec, const std::shared_ptr<OutputStream>&
raw)
+ : pool_(pool), raw_(raw), codec_(codec), is_open_(true) {}
+
+ ~Impl() { DCHECK(Close().ok()); }
+
+ Status Init() {
+ RETURN_NOT_OK(codec_->MakeCompressor(&compressor_));
+ RETURN_NOT_OK(AllocateResizableBuffer(pool_, kChunkSize, &compressed_));
+ compressed_pos_ = 0;
+ return Status::OK();
+ }
+
+ Status Tell(int64_t* position) const {
+ return Status::NotImplemented("Cannot tell() a compressed stream");
+ }
+
+ std::shared_ptr<OutputStream> raw() const { return raw_; }
+
+ Status FlushCompressed() {
+ if (compressed_pos_ > 0) {
+ RETURN_NOT_OK(raw_->Write(compressed_->data(), compressed_pos_));
+ compressed_pos_ = 0;
+ }
+ return Status::OK();
+ }
+
+ Status Write(const void* data, int64_t nbytes) {
+ std::lock_guard<std::mutex> guard(lock_);
+
+ auto input = reinterpret_cast<const uint8_t*>(data);
+ while (nbytes > 0) {
+ int64_t bytes_read, bytes_written;
+ int64_t input_len = nbytes;
+ int64_t output_len = compressed_->size() - compressed_pos_;
+ uint8_t* output = compressed_->mutable_data() + compressed_pos_;
+ RETURN_NOT_OK(compressor_->Compress(input_len, input, output_len, output,
+ &bytes_read, &bytes_written));
+ compressed_pos_ += bytes_written;
+
+ if (bytes_read == 0) {
+ // Not enough output, try to flush it and retry
+ if (compressed_pos_ > 0) {
+ RETURN_NOT_OK(FlushCompressed());
+ output_len = compressed_->size() - compressed_pos_;
+ output = compressed_->mutable_data() + compressed_pos_;
+ RETURN_NOT_OK(compressor_->Compress(input_len, input, output_len,
output,
+ &bytes_read, &bytes_written));
+ compressed_pos_ += bytes_written;
+ }
+ }
+ input += bytes_read;
+ nbytes -= bytes_read;
+ if (compressed_pos_ == compressed_->size()) {
+ // Output buffer full, flush it
+ RETURN_NOT_OK(FlushCompressed());
+ }
+ if (bytes_read == 0) {
+ // Need to enlarge output buffer
+ RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Flush() {
+ std::lock_guard<std::mutex> guard(lock_);
+
+ while (true) {
+ // Flush compressor
+ int64_t bytes_written;
+ bool should_retry;
+ int64_t output_len = compressed_->size() - compressed_pos_;
+ uint8_t* output = compressed_->mutable_data() + compressed_pos_;
+ RETURN_NOT_OK(
+ compressor_->Flush(output_len, output, &bytes_written,
&should_retry));
+ compressed_pos_ += bytes_written;
+
+ // Flush compressed output
+ RETURN_NOT_OK(FlushCompressed());
+
+ if (should_retry) {
+ // Need to enlarge output buffer
+ RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+ } else {
+ break;
+ }
+ }
+ return Status::OK();
+ }
+
+ Status FinalizeCompression() {
+ while (true) {
+ // Try to end compressor
+ int64_t bytes_written;
+ bool should_retry;
+ int64_t output_len = compressed_->size() - compressed_pos_;
+ uint8_t* output = compressed_->mutable_data() + compressed_pos_;
+ RETURN_NOT_OK(compressor_->End(output_len, output, &bytes_written,
&should_retry));
+ compressed_pos_ += bytes_written;
+
+ // Flush compressed output
+ RETURN_NOT_OK(FlushCompressed());
+
+ if (should_retry) {
+ // Need to enlarge output buffer
+ RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+ } else {
+ // Done
+ break;
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Close() {
+ std::lock_guard<std::mutex> guard(lock_);
+
+ if (is_open_) {
+ is_open_ = false;
+ RETURN_NOT_OK(FinalizeCompression());
+ return raw_->Close();
+ } else {
+ return Status::OK();
+ }
+ }
+
+ private:
+ // Write 64 KB compressed data at a time
+ static const int64_t kChunkSize = 64 * 1024;
+
+ MemoryPool* pool_;
+ std::shared_ptr<OutputStream> raw_;
+ Codec* codec_;
+ bool is_open_;
+ std::shared_ptr<Compressor> compressor_;
+ std::shared_ptr<ResizableBuffer> compressed_;
+ int64_t compressed_pos_;
+
+ mutable std::mutex lock_;
+};
+
+Status CompressedOutputStream::Make(util::Codec* codec,
+ const std::shared_ptr<OutputStream>& raw,
+ std::shared_ptr<CompressedOutputStream>*
out) {
+ return Make(default_memory_pool(), codec, raw, out);
+}
+
+Status CompressedOutputStream::Make(MemoryPool* pool, util::Codec* codec,
+ const std::shared_ptr<OutputStream>& raw,
+ std::shared_ptr<CompressedOutputStream>*
out) {
+ std::shared_ptr<CompressedOutputStream> res(new CompressedOutputStream);
+ res->impl_ = std::unique_ptr<Impl>(new Impl(pool, codec, std::move(raw)));
+ RETURN_NOT_OK(res->impl_->Init());
+ *out = res;
+ return Status::OK();
+}
+
+CompressedOutputStream::~CompressedOutputStream() {}
+
+Status CompressedOutputStream::Close() { return impl_->Close(); }
+
+Status CompressedOutputStream::Tell(int64_t* position) const {
+ return impl_->Tell(position);
+}
+
+Status CompressedOutputStream::Write(const void* data, int64_t nbytes) {
+ return impl_->Write(data, nbytes);
+}
+
+Status CompressedOutputStream::Flush() { return impl_->Flush(); }
+
+// ----------------------------------------------------------------------
+// CompressedInputStream implementation
+
+class CompressedInputStream::Impl {
+ public:
+ Impl(MemoryPool* pool, Codec* codec, const std::shared_ptr<InputStream>& raw)
+ : pool_(pool), raw_(raw), codec_(codec), is_open_(true) {}
+
+ Status Init() {
+ RETURN_NOT_OK(codec_->MakeDecompressor(&decompressor_));
+ return Status::OK();
+ }
+
+ ~Impl() { DCHECK(Close().ok()); }
+
+ Status Close() {
+ std::lock_guard<std::mutex> guard(lock_);
+ if (is_open_) {
+ is_open_ = false;
+ return raw_->Close();
+ } else {
+ return Status::OK();
+ }
+ }
+
+ Status Tell(int64_t* position) const {
+ return Status::NotImplemented("Cannot tell() a compressed stream");
+ }
+
+ // Read compressed data if necessary
+ Status EnsureCompressedData() {
+ int64_t compressed_avail = compressed_ ? compressed_->size() -
compressed_pos_ : 0;
+ if (compressed_avail == 0) {
+ // No compressed data available, read a full chunk
+ RETURN_NOT_OK(raw_->Read(kChunkSize, &compressed_));
+ compressed_pos_ = 0;
+ }
+ return Status::OK();
+ }
+
+ Status DecompressData() {
+ int64_t decompress_size = kDecompressSize;
+
+ while (true) {
+ RETURN_NOT_OK(AllocateResizableBuffer(pool_, decompress_size,
&decompressed_));
+ decompressed_pos_ = 0;
+
+ bool need_more_output;
+ int64_t bytes_read, bytes_written;
+ int64_t input_len = compressed_->size() - compressed_pos_;
+ const uint8_t* input = compressed_->data() + compressed_pos_;
+ int64_t output_len = decompressed_->size();
+ uint8_t* output = decompressed_->mutable_data();
+
+ RETURN_NOT_OK(decompressor_->Decompress(input_len, input, output_len,
output,
+ &bytes_read, &bytes_written,
+ &need_more_output));
+ compressed_pos_ += bytes_read;
+ if (bytes_written > 0 || !need_more_output || input_len == 0) {
+ RETURN_NOT_OK(decompressed_->Resize(bytes_written));
+ break;
+ }
+ DCHECK_EQ(bytes_written, 0);
+ // Need to enlarge output buffer
+ decompress_size *= 2;
+ }
+ return Status::OK();
+ }
+
+ Status Read(int64_t nbytes, int64_t* bytes_read, void* out) {
+ std::lock_guard<std::mutex> guard(lock_);
+
+ *bytes_read = 0;
+ auto out_data = reinterpret_cast<uint8_t*>(out);
+
+ while (nbytes > 0) {
+ int64_t avail = decompressed_ ? (decompressed_->size() -
decompressed_pos_) : 0;
+ if (avail > 0) {
+ // Pending decompressed data is available, use it
+ avail = std::min(avail, nbytes);
+ memcpy(out_data, decompressed_->data() + decompressed_pos_, avail);
+ decompressed_pos_ += avail;
+ out_data += avail;
+ *bytes_read += avail;
+ nbytes -= avail;
+ if (decompressed_pos_ == decompressed_->size()) {
+ // Decompressed data is exhausted, release buffer
+ decompressed_.reset();
+ }
+ if (nbytes == 0) {
+ // We're done
+ break;
+ }
+ }
+
+ // At this point, no more decompressed data remains,
+ // so we need to decompress more
+ if (decompressor_->IsFinished()) {
+ break;
+ }
+ // First try to read data from the decompressor
+ if (compressed_) {
+ RETURN_NOT_OK(DecompressData());
+ }
+ if (!decompressed_ || decompressed_->size() == 0) {
+ // Got nothing, need to read more compressed data
+ RETURN_NOT_OK(EnsureCompressedData());
+ if (compressed_pos_ == compressed_->size()) {
+ // Compressed stream unexpectedly exhausted
+ return Status::IOError("Truncated compressed stream");
+ }
+ RETURN_NOT_OK(DecompressData());
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+ std::shared_ptr<ResizableBuffer> buf;
+ RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buf));
+ int64_t bytes_read;
+ RETURN_NOT_OK(Read(nbytes, &bytes_read, buf->mutable_data()));
+ RETURN_NOT_OK(buf->Resize(bytes_read));
+ *out = buf;
+ return Status::OK();
+ }
+
+ std::shared_ptr<InputStream> raw() const { return raw_; }
+
+ private:
+ // Read 64 KB compressed data at a time
+ static const int64_t kChunkSize = 64 * 1024;
+ // Decompress 1 MB at a time
+ static const int64_t kDecompressSize = 1024 * 1024;
+
+ MemoryPool* pool_;
+ std::shared_ptr<InputStream> raw_;
+ Codec* codec_;
+ bool is_open_;
+ std::shared_ptr<Decompressor> decompressor_;
+ std::shared_ptr<Buffer> compressed_;
+ int64_t compressed_pos_;
+ std::shared_ptr<ResizableBuffer> decompressed_;
+ int64_t decompressed_pos_;
+
+ mutable std::mutex lock_;
+};
+
+Status CompressedInputStream::Make(Codec* codec, const
std::shared_ptr<InputStream>& raw,
+ std::shared_ptr<CompressedInputStream>*
out) {
+ return Make(default_memory_pool(), codec, raw, out);
+}
+
+Status CompressedInputStream::Make(MemoryPool* pool, Codec* codec,
+ const std::shared_ptr<InputStream>& raw,
+ std::shared_ptr<CompressedInputStream>*
out) {
+ std::shared_ptr<CompressedInputStream> res(new CompressedInputStream);
+ res->impl_ = std::unique_ptr<Impl>(new Impl(pool, codec, std::move(raw)));
+ RETURN_NOT_OK(res->impl_->Init());
+ *out = res;
+ return Status::OK();
+}
+
+CompressedInputStream::~CompressedInputStream() {}
+
+Status CompressedInputStream::Close() { return impl_->Close(); }
+
+Status CompressedInputStream::Tell(int64_t* position) const {
+ return impl_->Tell(position);
+}
+
+Status CompressedInputStream::Read(int64_t nbytes, int64_t* bytes_read, void*
out) {
+ return impl_->Read(nbytes, bytes_read, out);
+}
+
+Status CompressedInputStream::Read(int64_t nbytes, std::shared_ptr<Buffer>*
out) {
+ return impl_->Read(nbytes, out);
+}
+
+std::shared_ptr<InputStream> CompressedInputStream::raw() const { return
impl_->raw(); }
+
+} // namespace io
+} // namespace arrow
diff --git a/cpp/src/arrow/io/compressed.h b/cpp/src/arrow/io/compressed.h
new file mode 100644
index 0000000..bc9778f
--- /dev/null
+++ b/cpp/src/arrow/io/compressed.h
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Compressed stream implementations
+
+#ifndef ARROW_IO_COMPRESSED_H
+#define ARROW_IO_COMPRESSED_H
+
+#include <memory>
+#include <string>
+
+#include "arrow/io/interfaces.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class MemoryPool;
+class Status;
+
+namespace util {
+
+class Codec;
+
+} // namespace util
+
+namespace io {
+
+class ARROW_EXPORT CompressedOutputStream : public OutputStream {
+ public:
+ ~CompressedOutputStream() override;
+
+ /// \brief Create a compressed output stream wrapping the given output
stream.
+ static Status Make(util::Codec* codec, const std::shared_ptr<OutputStream>&
raw,
+ std::shared_ptr<CompressedOutputStream>* out);
+ static Status Make(MemoryPool* pool, util::Codec* codec,
+ const std::shared_ptr<OutputStream>& raw,
+ std::shared_ptr<CompressedOutputStream>* out);
+
+ // OutputStream interface
+
+ /// \brief Close the compressed output stream. This implicitly closes the
+ /// underlying raw output stream.
+ Status Close() override;
+
+ Status Tell(int64_t* position) const override;
+
+ Status Write(const void* data, int64_t nbytes) override;
+ Status Flush() override;
+
+ /// \brief Return the underlying raw output stream.
+ std::shared_ptr<OutputStream> raw() const;
+
+ private:
+ ARROW_DISALLOW_COPY_AND_ASSIGN(CompressedOutputStream);
+
+ CompressedOutputStream() = default;
+
+ class ARROW_NO_EXPORT Impl;
+ std::unique_ptr<Impl> impl_;
+};
+
+class ARROW_EXPORT CompressedInputStream : public InputStream {
+ public:
+ ~CompressedInputStream() override;
+
+ /// \brief Create a compressed input stream wrapping the given input stream.
+ static Status Make(util::Codec* codec, const std::shared_ptr<InputStream>&
raw,
+ std::shared_ptr<CompressedInputStream>* out);
+ static Status Make(MemoryPool* pool, util::Codec* codec,
+ const std::shared_ptr<InputStream>& raw,
+ std::shared_ptr<CompressedInputStream>* out);
+
+ // InputStream interface
+
+ /// \brief Close the compressed input stream. This implicitly closes the
+ /// underlying raw input stream.
+ Status Close() override;
+
+ Status Tell(int64_t* position) const override;
+
+ Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override;
+ Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
+ /// \brief Return the underlying raw input stream.
+ std::shared_ptr<InputStream> raw() const;
+
+ private:
+ ARROW_DISALLOW_COPY_AND_ASSIGN(CompressedInputStream);
+
+ CompressedInputStream() = default;
+
+ class ARROW_NO_EXPORT Impl;
+ std::unique_ptr<Impl> impl_;
+};
+
+} // namespace io
+} // namespace arrow
+
+#endif // ARROW_IO_COMPRESSED_H
diff --git a/cpp/src/arrow/io/io-compressed-test.cc
b/cpp/src/arrow/io/io-compressed-test.cc
new file mode 100644
index 0000000..78240da
--- /dev/null
+++ b/cpp/src/arrow/io/io-compressed-test.cc
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <random>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/buffer.h"
+#include "arrow/io/compressed.h"
+#include "arrow/io/memory.h"
+#include "arrow/io/test-common.h"
+#include "arrow/status.h"
+#include "arrow/test-util.h"
+#include "arrow/util/compression.h"
+
+namespace arrow {
+namespace io {
+
+using ::arrow::util::Codec;
+
+#ifdef ARROW_VALGRIND
+// Avoid slowing down tests too much with Valgrind
+static constexpr int64_t RANDOM_DATA_SIZE = 50 * 1024;
+static constexpr int64_t COMPRESSIBLE_DATA_SIZE = 120 * 1024;
+#else
+// The data should be large enough to exercise internal buffers
+static constexpr int64_t RANDOM_DATA_SIZE = 3 * 1024 * 1024;
+static constexpr int64_t COMPRESSIBLE_DATA_SIZE = 8 * 1024 * 1024;
+#endif
+
+std::vector<uint8_t> MakeRandomData(int data_size) {
+ std::vector<uint8_t> data(data_size);
+ random_bytes(data_size, 1234, data.data());
+ return data;
+}
+
+std::vector<uint8_t> MakeCompressibleData(int data_size) {
+ std::string base_data =
+ "Apache Arrow is a cross-language development platform for in-memory
data";
+ int nrepeats = static_cast<int>(1 + data_size / base_data.size());
+
+ std::vector<uint8_t> data(base_data.size() * nrepeats);
+ for (int i = 0; i < nrepeats; ++i) {
+ std::memcpy(data.data() + i * base_data.size(), base_data.data(),
base_data.size());
+ }
+ data.resize(data_size);
+ return data;
+}
+
+std::shared_ptr<Buffer> CompressDataOneShot(Codec* codec,
+ const std::vector<uint8_t>& data) {
+ int64_t max_compressed_len, compressed_len;
+ max_compressed_len = codec->MaxCompressedLen(data.size(), data.data());
+ std::shared_ptr<ResizableBuffer> compressed;
+ ABORT_NOT_OK(AllocateResizableBuffer(max_compressed_len, &compressed));
+ ABORT_NOT_OK(codec->Compress(data.size(), data.data(), max_compressed_len,
+ compressed->mutable_data(), &compressed_len));
+ ABORT_NOT_OK(compressed->Resize(compressed_len));
+ return compressed;
+}
+
+Status RunCompressedInputStream(Codec* codec, std::shared_ptr<Buffer>
compressed,
+ std::vector<uint8_t>* out) {
+ // Create compressed input stream
+ auto buffer_reader = std::make_shared<BufferReader>(compressed);
+ std::shared_ptr<CompressedInputStream> stream;
+ RETURN_NOT_OK(CompressedInputStream::Make(codec, buffer_reader, &stream));
+
+ std::vector<uint8_t> decompressed;
+ int64_t decompressed_size = 0;
+ const int64_t chunk_size = 1111;
+ while (true) {
+ std::shared_ptr<Buffer> buf;
+ RETURN_NOT_OK(stream->Read(chunk_size, &buf));
+ if (buf->size() == 0) {
+ // EOF
+ break;
+ }
+ decompressed.resize(decompressed_size + buf->size());
+ memcpy(decompressed.data() + decompressed_size, buf->data(), buf->size());
+ decompressed_size += buf->size();
+ }
+ *out = std::move(decompressed);
+ return Status::OK();
+}
+
+void CheckCompressedInputStream(Codec* codec, const std::vector<uint8_t>&
data) {
+ // Create compressed data
+ auto compressed = CompressDataOneShot(codec, data);
+
+ std::vector<uint8_t> decompressed;
+ ASSERT_OK(RunCompressedInputStream(codec, compressed, &decompressed));
+
+ ASSERT_EQ(decompressed.size(), data.size());
+ ASSERT_EQ(decompressed, data);
+}
+
+void CheckCompressedOutputStream(Codec* codec, const std::vector<uint8_t>&
data,
+ bool do_flush) {
+ // Create compressed output stream
+ std::shared_ptr<BufferOutputStream> buffer_writer;
+ ASSERT_OK(BufferOutputStream::Create(1024, default_memory_pool(),
&buffer_writer));
+ std::shared_ptr<CompressedOutputStream> stream;
+ ASSERT_OK(CompressedOutputStream::Make(codec, buffer_writer, &stream));
+
+ const uint8_t* input = data.data();
+ int64_t input_len = data.size();
+ const int64_t chunk_size = 1111;
+ while (input_len > 0) {
+ int64_t nbytes = std::min(chunk_size, input_len);
+ ASSERT_OK(stream->Write(input, nbytes));
+ input += nbytes;
+ input_len -= nbytes;
+ if (do_flush) {
+ ASSERT_OK(stream->Flush());
+ }
+ }
+ ASSERT_OK(stream->Close());
+
+ // Get compressed data and decompress it
+ std::shared_ptr<Buffer> compressed;
+ ASSERT_OK(buffer_writer->Finish(&compressed));
+ std::vector<uint8_t> decompressed(data.size());
+ ASSERT_OK(codec->Decompress(compressed->size(), compressed->data(),
decompressed.size(),
+ decompressed.data()));
+ ASSERT_EQ(decompressed, data);
+}
+
+class CompressedInputStreamTest : public
::testing::TestWithParam<Compression::type> {
+ protected:
+ Compression::type GetCompression() { return GetParam(); }
+
+ std::unique_ptr<Codec> MakeCodec() {
+ std::unique_ptr<Codec> codec;
+ ABORT_NOT_OK(Codec::Create(GetCompression(), &codec));
+ return codec;
+ }
+};
+
+TEST_P(CompressedInputStreamTest, CompressibleData) {
+ auto codec = MakeCodec();
+ auto data = MakeCompressibleData(COMPRESSIBLE_DATA_SIZE);
+
+ CheckCompressedInputStream(codec.get(), data);
+}
+
+TEST_P(CompressedInputStreamTest, RandomData) {
+ auto codec = MakeCodec();
+ auto data = MakeRandomData(RANDOM_DATA_SIZE);
+
+ CheckCompressedInputStream(codec.get(), data);
+}
+
+TEST_P(CompressedInputStreamTest, TruncatedData) {
+ auto codec = MakeCodec();
+ auto data = MakeRandomData(10000);
+ auto compressed = CompressDataOneShot(codec.get(), data);
+ auto truncated = SliceBuffer(compressed, 0, compressed->size() - 3);
+
+ std::vector<uint8_t> decompressed;
+ ASSERT_RAISES(IOError, RunCompressedInputStream(codec.get(), truncated,
&decompressed));
+}
+
+TEST_P(CompressedInputStreamTest, InvalidData) {
+ auto codec = MakeCodec();
+ auto compressed_data = MakeRandomData(10000);
+
+ auto buffer_reader =
std::make_shared<BufferReader>(Buffer::Wrap(compressed_data));
+ std::shared_ptr<CompressedInputStream> stream;
+ ASSERT_OK(CompressedInputStream::Make(codec.get(), buffer_reader, &stream));
+ std::shared_ptr<Buffer> out_buf;
+ ASSERT_RAISES(IOError, stream->Read(1024, &out_buf));
+}
+
+// NOTE: Snappy doesn't support streaming decompression
+
+// NOTE: LZ4 streaming decompression uses the LZ4 framing format,
+// which must be tested against a streaming compressor
+
+INSTANTIATE_TEST_CASE_P(TestGZipInputStream, CompressedInputStreamTest,
+ ::testing::Values(Compression::GZIP));
+
+INSTANTIATE_TEST_CASE_P(TestZSTDInputStream, CompressedInputStreamTest,
+ ::testing::Values(Compression::ZSTD));
+
+INSTANTIATE_TEST_CASE_P(TestBrotliInputStream, CompressedInputStreamTest,
+ ::testing::Values(Compression::BROTLI));
+
+class CompressedOutputStreamTest : public
::testing::TestWithParam<Compression::type> {
+ protected:
+ Compression::type GetCompression() { return GetParam(); }
+
+ std::unique_ptr<Codec> MakeCodec() {
+ std::unique_ptr<Codec> codec;
+ ABORT_NOT_OK(Codec::Create(GetCompression(), &codec));
+ return codec;
+ }
+};
+
+TEST_P(CompressedOutputStreamTest, CompressibleData) {
+ auto codec = MakeCodec();
+ auto data = MakeCompressibleData(COMPRESSIBLE_DATA_SIZE);
+
+ CheckCompressedOutputStream(codec.get(), data, false /* do_flush */);
+ CheckCompressedOutputStream(codec.get(), data, true /* do_flush */);
+}
+
+TEST_P(CompressedOutputStreamTest, RandomData) {
+ auto codec = MakeCodec();
+ auto data = MakeRandomData(RANDOM_DATA_SIZE);
+
+ CheckCompressedOutputStream(codec.get(), data, false /* do_flush */);
+ CheckCompressedOutputStream(codec.get(), data, true /* do_flush */);
+}
+
+INSTANTIATE_TEST_CASE_P(TestGZipOutputStream, CompressedOutputStreamTest,
+ ::testing::Values(Compression::GZIP));
+
+INSTANTIATE_TEST_CASE_P(TestZSTDOutputStream, CompressedOutputStreamTest,
+ ::testing::Values(Compression::ZSTD));
+
+INSTANTIATE_TEST_CASE_P(TestBrotliOutputStream, CompressedOutputStreamTest,
+ ::testing::Values(Compression::BROTLI));
+
+} // namespace io
+} // namespace arrow