This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new eab7d5f  ARROW-1019: [C++] Implement compressed streams
eab7d5f is described below

commit eab7d5f1dc5eecdd0f52176de0508a8efcb72332
Author: Antoine Pitrou <[email protected]>
AuthorDate: Thu Oct 18 15:34:35 2018 -0400

    ARROW-1019: [C++] Implement compressed streams
    
    Implement `CompressedInputStream` and `CompressedOutputStream` C++ classes. 
Tested with gzip, brotli and zstd codecs.
    
    I initially intended to expose the functionality in Python, but 
`NativeFile` expects a `RandomAccessFile` in read mode (rather than a mere 
`InputStream`).
    
    Author: Antoine Pitrou <[email protected]>
    
    Closes #2777 from pitrou/ARROW-1019-compressed-streams and squashes the 
following commits:
    
    64461f3e5 <Antoine Pitrou> Address review comments
    0efd75ffb <Antoine Pitrou> Add tests for truncated and invalid data
    77bce461f <Antoine Pitrou> Add Flush() tests
    4066f2347 <Antoine Pitrou> Allow passing an explicit MemoryPool
    9011a5d6e <Antoine Pitrou> Remove debug prints
    d5582ca30 <Antoine Pitrou> Reduce test duration with Valgrind
    9cde69bfd <Antoine Pitrou> ARROW-1019:  Implement compressed streams
---
 cpp/src/arrow/CMakeLists.txt           |   1 +
 cpp/src/arrow/io/CMakeLists.txt        |   2 +
 cpp/src/arrow/io/compressed.cc         | 399 +++++++++++++++++++++++++++++++++
 cpp/src/arrow/io/compressed.h          | 113 ++++++++++
 cpp/src/arrow/io/io-compressed-test.cc | 243 ++++++++++++++++++++
 5 files changed, 758 insertions(+)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 290b3c9..9a2f3a4 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -38,6 +38,7 @@ set(ARROW_SRCS
   csv/reader.cc
 
   io/buffered.cc
+  io/compressed.cc
   io/file.cc
   io/interfaces.cc
   io/memory.cc
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index ff6b854..d21bb16 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -19,6 +19,7 @@
 # arrow_io : Arrow IO interfaces
 
 ADD_ARROW_TEST(io-buffered-test)
+ADD_ARROW_TEST(io-compressed-test)
 ADD_ARROW_TEST(io-file-test)
 
 if (ARROW_HDFS AND NOT ARROW_BOOST_HEADER_ONLY)
@@ -35,6 +36,7 @@ ADD_ARROW_BENCHMARK(io-memory-benchmark)
 install(FILES
   api.h
   buffered.h
+  compressed.h
   file.h
   hdfs.h
   interfaces.h
diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc
new file mode 100644
index 0000000..b0aabcb
--- /dev/null
+++ b/cpp/src/arrow/io/compressed.cc
@@ -0,0 +1,399 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/io/compressed.h"
+
+#include <algorithm>
+#include <cstring>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <utility>
+
+#include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
+#include "arrow/status.h"
+#include "arrow/util/compression.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+using util::Codec;
+using util::Compressor;
+using util::Decompressor;
+
+namespace io {
+
+// ----------------------------------------------------------------------
+// CompressedOutputStream implementation
+
+class CompressedOutputStream::Impl {
+ public:
+  Impl(MemoryPool* pool, Codec* codec, const std::shared_ptr<OutputStream>& 
raw)
+      : pool_(pool), raw_(raw), codec_(codec), is_open_(true) {}
+
+  ~Impl() { DCHECK(Close().ok()); }
+
+  Status Init() {
+    RETURN_NOT_OK(codec_->MakeCompressor(&compressor_));
+    RETURN_NOT_OK(AllocateResizableBuffer(pool_, kChunkSize, &compressed_));
+    compressed_pos_ = 0;
+    return Status::OK();
+  }
+
+  Status Tell(int64_t* position) const {
+    return Status::NotImplemented("Cannot tell() a compressed stream");
+  }
+
+  std::shared_ptr<OutputStream> raw() const { return raw_; }
+
+  Status FlushCompressed() {
+    if (compressed_pos_ > 0) {
+      RETURN_NOT_OK(raw_->Write(compressed_->data(), compressed_pos_));
+      compressed_pos_ = 0;
+    }
+    return Status::OK();
+  }
+
+  Status Write(const void* data, int64_t nbytes) {
+    std::lock_guard<std::mutex> guard(lock_);
+
+    auto input = reinterpret_cast<const uint8_t*>(data);
+    while (nbytes > 0) {
+      int64_t bytes_read, bytes_written;
+      int64_t input_len = nbytes;
+      int64_t output_len = compressed_->size() - compressed_pos_;
+      uint8_t* output = compressed_->mutable_data() + compressed_pos_;
+      RETURN_NOT_OK(compressor_->Compress(input_len, input, output_len, output,
+                                          &bytes_read, &bytes_written));
+      compressed_pos_ += bytes_written;
+
+      if (bytes_read == 0) {
+        // Not enough output, try to flush it and retry
+        if (compressed_pos_ > 0) {
+          RETURN_NOT_OK(FlushCompressed());
+          output_len = compressed_->size() - compressed_pos_;
+          output = compressed_->mutable_data() + compressed_pos_;
+          RETURN_NOT_OK(compressor_->Compress(input_len, input, output_len, 
output,
+                                              &bytes_read, &bytes_written));
+          compressed_pos_ += bytes_written;
+        }
+      }
+      input += bytes_read;
+      nbytes -= bytes_read;
+      if (compressed_pos_ == compressed_->size()) {
+        // Output buffer full, flush it
+        RETURN_NOT_OK(FlushCompressed());
+      }
+      if (bytes_read == 0) {
+        // Need to enlarge output buffer
+        RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+      }
+    }
+    return Status::OK();
+  }
+
+  Status Flush() {
+    std::lock_guard<std::mutex> guard(lock_);
+
+    while (true) {
+      // Flush compressor
+      int64_t bytes_written;
+      bool should_retry;
+      int64_t output_len = compressed_->size() - compressed_pos_;
+      uint8_t* output = compressed_->mutable_data() + compressed_pos_;
+      RETURN_NOT_OK(
+          compressor_->Flush(output_len, output, &bytes_written, 
&should_retry));
+      compressed_pos_ += bytes_written;
+
+      // Flush compressed output
+      RETURN_NOT_OK(FlushCompressed());
+
+      if (should_retry) {
+        // Need to enlarge output buffer
+        RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+      } else {
+        break;
+      }
+    }
+    return Status::OK();
+  }
+
+  Status FinalizeCompression() {
+    while (true) {
+      // Try to end compressor
+      int64_t bytes_written;
+      bool should_retry;
+      int64_t output_len = compressed_->size() - compressed_pos_;
+      uint8_t* output = compressed_->mutable_data() + compressed_pos_;
+      RETURN_NOT_OK(compressor_->End(output_len, output, &bytes_written, 
&should_retry));
+      compressed_pos_ += bytes_written;
+
+      // Flush compressed output
+      RETURN_NOT_OK(FlushCompressed());
+
+      if (should_retry) {
+        // Need to enlarge output buffer
+        RETURN_NOT_OK(compressed_->Resize(compressed_->size() * 2));
+      } else {
+        // Done
+        break;
+      }
+    }
+    return Status::OK();
+  }
+
+  Status Close() {
+    std::lock_guard<std::mutex> guard(lock_);
+
+    if (is_open_) {
+      is_open_ = false;
+      RETURN_NOT_OK(FinalizeCompression());
+      return raw_->Close();
+    } else {
+      return Status::OK();
+    }
+  }
+
+ private:
+  // Write 64 KB compressed data at a time
+  static const int64_t kChunkSize = 64 * 1024;
+
+  MemoryPool* pool_;
+  std::shared_ptr<OutputStream> raw_;
+  Codec* codec_;
+  bool is_open_;
+  std::shared_ptr<Compressor> compressor_;
+  std::shared_ptr<ResizableBuffer> compressed_;
+  int64_t compressed_pos_;
+
+  mutable std::mutex lock_;
+};
+
+Status CompressedOutputStream::Make(util::Codec* codec,
+                                    const std::shared_ptr<OutputStream>& raw,
+                                    std::shared_ptr<CompressedOutputStream>* 
out) {
+  return Make(default_memory_pool(), codec, raw, out);
+}
+
+Status CompressedOutputStream::Make(MemoryPool* pool, util::Codec* codec,
+                                    const std::shared_ptr<OutputStream>& raw,
+                                    std::shared_ptr<CompressedOutputStream>* 
out) {
+  std::shared_ptr<CompressedOutputStream> res(new CompressedOutputStream);
+  res->impl_ = std::unique_ptr<Impl>(new Impl(pool, codec, std::move(raw)));
+  RETURN_NOT_OK(res->impl_->Init());
+  *out = res;
+  return Status::OK();
+}
+
+CompressedOutputStream::~CompressedOutputStream() {}
+
+Status CompressedOutputStream::Close() { return impl_->Close(); }
+
+Status CompressedOutputStream::Tell(int64_t* position) const {
+  return impl_->Tell(position);
+}
+
+Status CompressedOutputStream::Write(const void* data, int64_t nbytes) {
+  return impl_->Write(data, nbytes);
+}
+
+Status CompressedOutputStream::Flush() { return impl_->Flush(); }
+
+// ----------------------------------------------------------------------
+// CompressedInputStream implementation
+
+class CompressedInputStream::Impl {
+ public:
+  Impl(MemoryPool* pool, Codec* codec, const std::shared_ptr<InputStream>& raw)
+      : pool_(pool), raw_(raw), codec_(codec), is_open_(true) {}
+
+  Status Init() {
+    RETURN_NOT_OK(codec_->MakeDecompressor(&decompressor_));
+    return Status::OK();
+  }
+
+  ~Impl() { DCHECK(Close().ok()); }
+
+  Status Close() {
+    std::lock_guard<std::mutex> guard(lock_);
+    if (is_open_) {
+      is_open_ = false;
+      return raw_->Close();
+    } else {
+      return Status::OK();
+    }
+  }
+
+  Status Tell(int64_t* position) const {
+    return Status::NotImplemented("Cannot tell() a compressed stream");
+  }
+
+  // Read compressed data if necessary
+  Status EnsureCompressedData() {
+    int64_t compressed_avail = compressed_ ? compressed_->size() - 
compressed_pos_ : 0;
+    if (compressed_avail == 0) {
+      // No compressed data available, read a full chunk
+      RETURN_NOT_OK(raw_->Read(kChunkSize, &compressed_));
+      compressed_pos_ = 0;
+    }
+    return Status::OK();
+  }
+
+  Status DecompressData() {
+    int64_t decompress_size = kDecompressSize;
+
+    while (true) {
+      RETURN_NOT_OK(AllocateResizableBuffer(pool_, decompress_size, 
&decompressed_));
+      decompressed_pos_ = 0;
+
+      bool need_more_output;
+      int64_t bytes_read, bytes_written;
+      int64_t input_len = compressed_->size() - compressed_pos_;
+      const uint8_t* input = compressed_->data() + compressed_pos_;
+      int64_t output_len = decompressed_->size();
+      uint8_t* output = decompressed_->mutable_data();
+
+      RETURN_NOT_OK(decompressor_->Decompress(input_len, input, output_len, 
output,
+                                              &bytes_read, &bytes_written,
+                                              &need_more_output));
+      compressed_pos_ += bytes_read;
+      if (bytes_written > 0 || !need_more_output || input_len == 0) {
+        RETURN_NOT_OK(decompressed_->Resize(bytes_written));
+        break;
+      }
+      DCHECK_EQ(bytes_written, 0);
+      // Need to enlarge output buffer
+      decompress_size *= 2;
+    }
+    return Status::OK();
+  }
+
+  Status Read(int64_t nbytes, int64_t* bytes_read, void* out) {
+    std::lock_guard<std::mutex> guard(lock_);
+
+    *bytes_read = 0;
+    auto out_data = reinterpret_cast<uint8_t*>(out);
+
+    while (nbytes > 0) {
+      int64_t avail = decompressed_ ? (decompressed_->size() - 
decompressed_pos_) : 0;
+      if (avail > 0) {
+        // Pending decompressed data is available, use it
+        avail = std::min(avail, nbytes);
+        memcpy(out_data, decompressed_->data() + decompressed_pos_, avail);
+        decompressed_pos_ += avail;
+        out_data += avail;
+        *bytes_read += avail;
+        nbytes -= avail;
+        if (decompressed_pos_ == decompressed_->size()) {
+          // Decompressed data is exhausted, release buffer
+          decompressed_.reset();
+        }
+        if (nbytes == 0) {
+          // We're done
+          break;
+        }
+      }
+
+      // At this point, no more decompressed data remains,
+      // so we need to decompress more
+      if (decompressor_->IsFinished()) {
+        break;
+      }
+      // First try to read data from the decompressor
+      if (compressed_) {
+        RETURN_NOT_OK(DecompressData());
+      }
+      if (!decompressed_ || decompressed_->size() == 0) {
+        // Got nothing, need to read more compressed data
+        RETURN_NOT_OK(EnsureCompressedData());
+        if (compressed_pos_ == compressed_->size()) {
+          // Compressed stream unexpectedly exhausted
+          return Status::IOError("Truncated compressed stream");
+        }
+        RETURN_NOT_OK(DecompressData());
+      }
+    }
+    return Status::OK();
+  }
+
+  Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+    std::shared_ptr<ResizableBuffer> buf;
+    RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buf));
+    int64_t bytes_read;
+    RETURN_NOT_OK(Read(nbytes, &bytes_read, buf->mutable_data()));
+    RETURN_NOT_OK(buf->Resize(bytes_read));
+    *out = buf;
+    return Status::OK();
+  }
+
+  std::shared_ptr<InputStream> raw() const { return raw_; }
+
+ private:
+  // Read 64 KB compressed data at a time
+  static const int64_t kChunkSize = 64 * 1024;
+  // Decompress 1 MB at a time
+  static const int64_t kDecompressSize = 1024 * 1024;
+
+  MemoryPool* pool_;
+  std::shared_ptr<InputStream> raw_;
+  Codec* codec_;
+  bool is_open_;
+  std::shared_ptr<Decompressor> decompressor_;
+  std::shared_ptr<Buffer> compressed_;
+  int64_t compressed_pos_;
+  std::shared_ptr<ResizableBuffer> decompressed_;
+  int64_t decompressed_pos_;
+
+  mutable std::mutex lock_;
+};
+
+Status CompressedInputStream::Make(Codec* codec, const 
std::shared_ptr<InputStream>& raw,
+                                   std::shared_ptr<CompressedInputStream>* 
out) {
+  return Make(default_memory_pool(), codec, raw, out);
+}
+
+Status CompressedInputStream::Make(MemoryPool* pool, Codec* codec,
+                                   const std::shared_ptr<InputStream>& raw,
+                                   std::shared_ptr<CompressedInputStream>* 
out) {
+  std::shared_ptr<CompressedInputStream> res(new CompressedInputStream);
+  res->impl_ = std::unique_ptr<Impl>(new Impl(pool, codec, std::move(raw)));
+  RETURN_NOT_OK(res->impl_->Init());
+  *out = res;
+  return Status::OK();
+}
+
+CompressedInputStream::~CompressedInputStream() {}
+
+Status CompressedInputStream::Close() { return impl_->Close(); }
+
+Status CompressedInputStream::Tell(int64_t* position) const {
+  return impl_->Tell(position);
+}
+
+Status CompressedInputStream::Read(int64_t nbytes, int64_t* bytes_read, void* 
out) {
+  return impl_->Read(nbytes, bytes_read, out);
+}
+
+Status CompressedInputStream::Read(int64_t nbytes, std::shared_ptr<Buffer>* 
out) {
+  return impl_->Read(nbytes, out);
+}
+
+std::shared_ptr<InputStream> CompressedInputStream::raw() const { return 
impl_->raw(); }
+
+}  // namespace io
+}  // namespace arrow
diff --git a/cpp/src/arrow/io/compressed.h b/cpp/src/arrow/io/compressed.h
new file mode 100644
index 0000000..bc9778f
--- /dev/null
+++ b/cpp/src/arrow/io/compressed.h
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Compressed stream implementations
+
+#ifndef ARROW_IO_COMPRESSED_H
+#define ARROW_IO_COMPRESSED_H
+
+#include <memory>
+#include <string>
+
+#include "arrow/io/interfaces.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class MemoryPool;
+class Status;
+
+namespace util {
+
+class Codec;
+
+}  // namespace util
+
+namespace io {
+
+class ARROW_EXPORT CompressedOutputStream : public OutputStream {
+ public:
+  ~CompressedOutputStream() override;
+
+  /// \brief Create a compressed output stream wrapping the given output 
stream.
+  static Status Make(util::Codec* codec, const std::shared_ptr<OutputStream>& 
raw,
+                     std::shared_ptr<CompressedOutputStream>* out);
+  static Status Make(MemoryPool* pool, util::Codec* codec,
+                     const std::shared_ptr<OutputStream>& raw,
+                     std::shared_ptr<CompressedOutputStream>* out);
+
+  // OutputStream interface
+
+  /// \brief Close the compressed output stream.  This implicitly closes the
+  /// underlying raw output stream.
+  Status Close() override;
+
+  Status Tell(int64_t* position) const override;
+
+  Status Write(const void* data, int64_t nbytes) override;
+  Status Flush() override;
+
+  /// \brief Return the underlying raw output stream.
+  std::shared_ptr<OutputStream> raw() const;
+
+ private:
+  ARROW_DISALLOW_COPY_AND_ASSIGN(CompressedOutputStream);
+
+  CompressedOutputStream() = default;
+
+  class ARROW_NO_EXPORT Impl;
+  std::unique_ptr<Impl> impl_;
+};
+
+class ARROW_EXPORT CompressedInputStream : public InputStream {
+ public:
+  ~CompressedInputStream() override;
+
+  /// \brief Create a compressed input stream wrapping the given input stream.
+  static Status Make(util::Codec* codec, const std::shared_ptr<InputStream>& 
raw,
+                     std::shared_ptr<CompressedInputStream>* out);
+  static Status Make(MemoryPool* pool, util::Codec* codec,
+                     const std::shared_ptr<InputStream>& raw,
+                     std::shared_ptr<CompressedInputStream>* out);
+
+  // InputStream interface
+
+  /// \brief Close the compressed input stream.  This implicitly closes the
+  /// underlying raw input stream.
+  Status Close() override;
+
+  Status Tell(int64_t* position) const override;
+
+  Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override;
+  Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
+  /// \brief Return the underlying raw input stream.
+  std::shared_ptr<InputStream> raw() const;
+
+ private:
+  ARROW_DISALLOW_COPY_AND_ASSIGN(CompressedInputStream);
+
+  CompressedInputStream() = default;
+
+  class ARROW_NO_EXPORT Impl;
+  std::unique_ptr<Impl> impl_;
+};
+
+}  // namespace io
+}  // namespace arrow
+
+#endif  // ARROW_IO_COMPRESSED_H
diff --git a/cpp/src/arrow/io/io-compressed-test.cc 
b/cpp/src/arrow/io/io-compressed-test.cc
new file mode 100644
index 0000000..78240da
--- /dev/null
+++ b/cpp/src/arrow/io/io-compressed-test.cc
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <random>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/buffer.h"
+#include "arrow/io/compressed.h"
+#include "arrow/io/memory.h"
+#include "arrow/io/test-common.h"
+#include "arrow/status.h"
+#include "arrow/test-util.h"
+#include "arrow/util/compression.h"
+
+namespace arrow {
+namespace io {
+
+using ::arrow::util::Codec;
+
+#ifdef ARROW_VALGRIND
+// Avoid slowing down tests too much with Valgrind
+static constexpr int64_t RANDOM_DATA_SIZE = 50 * 1024;
+static constexpr int64_t COMPRESSIBLE_DATA_SIZE = 120 * 1024;
+#else
+// The data should be large enough to exercise internal buffers
+static constexpr int64_t RANDOM_DATA_SIZE = 3 * 1024 * 1024;
+static constexpr int64_t COMPRESSIBLE_DATA_SIZE = 8 * 1024 * 1024;
+#endif
+
+std::vector<uint8_t> MakeRandomData(int data_size) {
+  std::vector<uint8_t> data(data_size);
+  random_bytes(data_size, 1234, data.data());
+  return data;
+}
+
+std::vector<uint8_t> MakeCompressibleData(int data_size) {
+  std::string base_data =
+      "Apache Arrow is a cross-language development platform for in-memory 
data";
+  int nrepeats = static_cast<int>(1 + data_size / base_data.size());
+
+  std::vector<uint8_t> data(base_data.size() * nrepeats);
+  for (int i = 0; i < nrepeats; ++i) {
+    std::memcpy(data.data() + i * base_data.size(), base_data.data(), 
base_data.size());
+  }
+  data.resize(data_size);
+  return data;
+}
+
+std::shared_ptr<Buffer> CompressDataOneShot(Codec* codec,
+                                            const std::vector<uint8_t>& data) {
+  int64_t max_compressed_len, compressed_len;
+  max_compressed_len = codec->MaxCompressedLen(data.size(), data.data());
+  std::shared_ptr<ResizableBuffer> compressed;
+  ABORT_NOT_OK(AllocateResizableBuffer(max_compressed_len, &compressed));
+  ABORT_NOT_OK(codec->Compress(data.size(), data.data(), max_compressed_len,
+                               compressed->mutable_data(), &compressed_len));
+  ABORT_NOT_OK(compressed->Resize(compressed_len));
+  return compressed;
+}
+
+Status RunCompressedInputStream(Codec* codec, std::shared_ptr<Buffer> 
compressed,
+                                std::vector<uint8_t>* out) {
+  // Create compressed input stream
+  auto buffer_reader = std::make_shared<BufferReader>(compressed);
+  std::shared_ptr<CompressedInputStream> stream;
+  RETURN_NOT_OK(CompressedInputStream::Make(codec, buffer_reader, &stream));
+
+  std::vector<uint8_t> decompressed;
+  int64_t decompressed_size = 0;
+  const int64_t chunk_size = 1111;
+  while (true) {
+    std::shared_ptr<Buffer> buf;
+    RETURN_NOT_OK(stream->Read(chunk_size, &buf));
+    if (buf->size() == 0) {
+      // EOF
+      break;
+    }
+    decompressed.resize(decompressed_size + buf->size());
+    memcpy(decompressed.data() + decompressed_size, buf->data(), buf->size());
+    decompressed_size += buf->size();
+  }
+  *out = std::move(decompressed);
+  return Status::OK();
+}
+
+void CheckCompressedInputStream(Codec* codec, const std::vector<uint8_t>& 
data) {
+  // Create compressed data
+  auto compressed = CompressDataOneShot(codec, data);
+
+  std::vector<uint8_t> decompressed;
+  ASSERT_OK(RunCompressedInputStream(codec, compressed, &decompressed));
+
+  ASSERT_EQ(decompressed.size(), data.size());
+  ASSERT_EQ(decompressed, data);
+}
+
+void CheckCompressedOutputStream(Codec* codec, const std::vector<uint8_t>& 
data,
+                                 bool do_flush) {
+  // Create compressed output stream
+  std::shared_ptr<BufferOutputStream> buffer_writer;
+  ASSERT_OK(BufferOutputStream::Create(1024, default_memory_pool(), 
&buffer_writer));
+  std::shared_ptr<CompressedOutputStream> stream;
+  ASSERT_OK(CompressedOutputStream::Make(codec, buffer_writer, &stream));
+
+  const uint8_t* input = data.data();
+  int64_t input_len = data.size();
+  const int64_t chunk_size = 1111;
+  while (input_len > 0) {
+    int64_t nbytes = std::min(chunk_size, input_len);
+    ASSERT_OK(stream->Write(input, nbytes));
+    input += nbytes;
+    input_len -= nbytes;
+    if (do_flush) {
+      ASSERT_OK(stream->Flush());
+    }
+  }
+  ASSERT_OK(stream->Close());
+
+  // Get compressed data and decompress it
+  std::shared_ptr<Buffer> compressed;
+  ASSERT_OK(buffer_writer->Finish(&compressed));
+  std::vector<uint8_t> decompressed(data.size());
+  ASSERT_OK(codec->Decompress(compressed->size(), compressed->data(), 
decompressed.size(),
+                              decompressed.data()));
+  ASSERT_EQ(decompressed, data);
+}
+
+class CompressedInputStreamTest : public 
::testing::TestWithParam<Compression::type> {
+ protected:
+  Compression::type GetCompression() { return GetParam(); }
+
+  std::unique_ptr<Codec> MakeCodec() {
+    std::unique_ptr<Codec> codec;
+    ABORT_NOT_OK(Codec::Create(GetCompression(), &codec));
+    return codec;
+  }
+};
+
+TEST_P(CompressedInputStreamTest, CompressibleData) {
+  auto codec = MakeCodec();
+  auto data = MakeCompressibleData(COMPRESSIBLE_DATA_SIZE);
+
+  CheckCompressedInputStream(codec.get(), data);
+}
+
+TEST_P(CompressedInputStreamTest, RandomData) {
+  auto codec = MakeCodec();
+  auto data = MakeRandomData(RANDOM_DATA_SIZE);
+
+  CheckCompressedInputStream(codec.get(), data);
+}
+
+TEST_P(CompressedInputStreamTest, TruncatedData) {
+  auto codec = MakeCodec();
+  auto data = MakeRandomData(10000);
+  auto compressed = CompressDataOneShot(codec.get(), data);
+  auto truncated = SliceBuffer(compressed, 0, compressed->size() - 3);
+
+  std::vector<uint8_t> decompressed;
+  ASSERT_RAISES(IOError, RunCompressedInputStream(codec.get(), truncated, 
&decompressed));
+}
+
+TEST_P(CompressedInputStreamTest, InvalidData) {
+  auto codec = MakeCodec();
+  auto compressed_data = MakeRandomData(10000);
+
+  auto buffer_reader = 
std::make_shared<BufferReader>(Buffer::Wrap(compressed_data));
+  std::shared_ptr<CompressedInputStream> stream;
+  ASSERT_OK(CompressedInputStream::Make(codec.get(), buffer_reader, &stream));
+  std::shared_ptr<Buffer> out_buf;
+  ASSERT_RAISES(IOError, stream->Read(1024, &out_buf));
+}
+
+// NOTE: Snappy doesn't support streaming decompression
+
+// NOTE: LZ4 streaming decompression uses the LZ4 framing format,
+// which must be tested against a streaming compressor
+
+INSTANTIATE_TEST_CASE_P(TestGZipInputStream, CompressedInputStreamTest,
+                        ::testing::Values(Compression::GZIP));
+
+INSTANTIATE_TEST_CASE_P(TestZSTDInputStream, CompressedInputStreamTest,
+                        ::testing::Values(Compression::ZSTD));
+
+INSTANTIATE_TEST_CASE_P(TestBrotliInputStream, CompressedInputStreamTest,
+                        ::testing::Values(Compression::BROTLI));
+
+class CompressedOutputStreamTest : public 
::testing::TestWithParam<Compression::type> {
+ protected:
+  Compression::type GetCompression() { return GetParam(); }
+
+  std::unique_ptr<Codec> MakeCodec() {
+    std::unique_ptr<Codec> codec;
+    ABORT_NOT_OK(Codec::Create(GetCompression(), &codec));
+    return codec;
+  }
+};
+
+TEST_P(CompressedOutputStreamTest, CompressibleData) {
+  auto codec = MakeCodec();
+  auto data = MakeCompressibleData(COMPRESSIBLE_DATA_SIZE);
+
+  CheckCompressedOutputStream(codec.get(), data, false /* do_flush */);
+  CheckCompressedOutputStream(codec.get(), data, true /* do_flush */);
+}
+
+TEST_P(CompressedOutputStreamTest, RandomData) {
+  auto codec = MakeCodec();
+  auto data = MakeRandomData(RANDOM_DATA_SIZE);
+
+  CheckCompressedOutputStream(codec.get(), data, false /* do_flush */);
+  CheckCompressedOutputStream(codec.get(), data, true /* do_flush */);
+}
+
+INSTANTIATE_TEST_CASE_P(TestGZipOutputStream, CompressedOutputStreamTest,
+                        ::testing::Values(Compression::GZIP));
+
+INSTANTIATE_TEST_CASE_P(TestZSTDOutputStream, CompressedOutputStreamTest,
+                        ::testing::Values(Compression::ZSTD));
+
+INSTANTIATE_TEST_CASE_P(TestBrotliOutputStream, CompressedOutputStreamTest,
+                        ::testing::Values(Compression::BROTLI));
+
+}  // namespace io
+}  // namespace arrow

Reply via email to