This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 65f0316  ARROW-3409: [C++] Streaming compression and decompression 
interfaces
65f0316 is described below

commit 65f03168f20c9429768f64b761209365a99cbada
Author: Antoine Pitrou <[email protected]>
AuthorDate: Mon Oct 15 14:16:04 2018 +0200

    ARROW-3409: [C++] Streaming compression and decompression interfaces
    
    Author: Antoine Pitrou <[email protected]>
    
    Closes #2696 from pitrou/ARROW-3409-streaming-compression and squashes the 
following commits:
    
    891bfaf4 <Antoine Pitrou> Fix conversion warning
    a4ace53c <Antoine Pitrou> Address review comments
    ce904081 <Antoine Pitrou> ARROW-3409:  Streaming compression and 
decompression interfaces
---
 cpp/src/arrow/symbols.map                |   1 +
 cpp/src/arrow/util/compression-test.cc   | 337 +++++++++++++++++++++++++++++--
 cpp/src/arrow/util/compression.cc        |   4 +
 cpp/src/arrow/util/compression.h         |  82 +++++++-
 cpp/src/arrow/util/compression_brotli.cc | 174 +++++++++++++++-
 cpp/src/arrow/util/compression_brotli.h  |   5 +
 cpp/src/arrow/util/compression_lz4.cc    | 222 +++++++++++++++++++-
 cpp/src/arrow/util/compression_lz4.h     |   5 +
 cpp/src/arrow/util/compression_snappy.cc |   8 +
 cpp/src/arrow/util/compression_snappy.h  |   5 +
 cpp/src/arrow/util/compression_zlib.cc   | 296 ++++++++++++++++++++++++++-
 cpp/src/arrow/util/compression_zlib.h    |   4 +
 cpp/src/arrow/util/compression_zstd.cc   | 178 +++++++++++++++-
 cpp/src/arrow/util/compression_zstd.h    |   5 +
 14 files changed, 1293 insertions(+), 33 deletions(-)

diff --git a/cpp/src/arrow/symbols.map b/cpp/src/arrow/symbols.map
index 96faf59..18802ff 100644
--- a/cpp/src/arrow/symbols.map
+++ b/cpp/src/arrow/symbols.map
@@ -49,6 +49,7 @@
     _tr_*;
     # lz4
     LZ4_*;
+    LZ4F_*;
     # zstandard
     ZSTD_*;
     ZSTDv*;
diff --git a/cpp/src/arrow/util/compression-test.cc 
b/cpp/src/arrow/util/compression-test.cc
index 11f99e3..10447c6 100644
--- a/cpp/src/arrow/util/compression-test.cc
+++ b/cpp/src/arrow/util/compression-test.cc
@@ -15,8 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <cmath>
 #include <cstdint>
+#include <cstring>
 #include <memory>
+#include <random>
 #include <string>
 #include <vector>
 
@@ -31,13 +34,33 @@ using std::vector;
 namespace arrow {
 namespace util {
 
-template <Compression::type CODEC>
-void CheckCodecRoundtrip(const vector<uint8_t>& data) {
+vector<uint8_t> MakeRandomData(int data_size) {
+  vector<uint8_t> data(data_size);
+  random_bytes(data_size, 1234, data.data());
+  return data;
+}
+
+vector<uint8_t> MakeCompressibleData(int data_size) {
+  std::string base_data =
+      "Apache Arrow is a cross-language development platform for in-memory 
data";
+  int nrepeats = static_cast<int>(1 + data_size / base_data.size());
+
+  vector<uint8_t> data(base_data.size() * nrepeats);
+  for (int i = 0; i < nrepeats; ++i) {
+    std::memcpy(data.data() + i * base_data.size(), base_data.data(), 
base_data.size());
+  }
+  data.resize(data_size);
+  return data;
+}
+
+// Check roundtrip of one-shot compression and decompression functions.
+
+void CheckCodecRoundtrip(Compression::type ctype, const vector<uint8_t>& data) 
{
   // create multiple compressors to try to break them
   std::unique_ptr<Codec> c1, c2;
 
-  ASSERT_OK(Codec::Create(CODEC, &c1));
-  ASSERT_OK(Codec::Create(CODEC, &c2));
+  ASSERT_OK(Codec::Create(ctype, &c1));
+  ASSERT_OK(Codec::Create(ctype, &c2));
 
   int max_compressed_len =
       static_cast<int>(c1->MaxCompressedLen(data.size(), data.data()));
@@ -69,25 +92,311 @@ void CheckCodecRoundtrip(const vector<uint8_t>& data) {
   ASSERT_EQ(data, decompressed);
 }
 
-template <Compression::type CODEC>
-void CheckCodec() {
+// Check the streaming compressor against one-shot decompression
+
+void CheckStreamingCompressor(Codec* codec, const vector<uint8_t>& data) {
+  std::shared_ptr<Compressor> compressor;
+  ASSERT_OK(codec->MakeCompressor(&compressor));
+
+  std::vector<uint8_t> compressed;
+  int64_t compressed_size = 0;
+  const uint8_t* input = data.data();
+  int64_t remaining = data.size();
+
+  compressed.resize(10);
+  bool do_flush = false;
+
+  while (remaining > 0) {
+    // Feed a small amount each time
+    int64_t input_len = std::min(remaining, static_cast<int64_t>(1111));
+    int64_t output_len = compressed.size() - compressed_size;
+    uint8_t* output = compressed.data() + compressed_size;
+    int64_t bytes_read, bytes_written;
+    ASSERT_OK(compressor->Compress(input_len, input, output_len, output, 
&bytes_read,
+                                   &bytes_written));
+    ASSERT_LE(bytes_read, input_len);
+    ASSERT_LE(bytes_written, output_len);
+    compressed_size += bytes_written;
+    input += bytes_read;
+    remaining -= bytes_read;
+    if (bytes_read == 0) {
+      compressed.resize(compressed.capacity() * 2);
+    }
+    // Once every two iterations, do a flush
+    if (do_flush) {
+      bool should_retry = false;
+      do {
+        output_len = compressed.size() - compressed_size;
+        output = compressed.data() + compressed_size;
+        ASSERT_OK(compressor->Flush(output_len, output, &bytes_written, 
&should_retry));
+        ASSERT_LE(bytes_written, output_len);
+        compressed_size += bytes_written;
+        if (should_retry) {
+          compressed.resize(compressed.capacity() * 2);
+        }
+      } while (should_retry);
+    }
+    do_flush = !do_flush;
+  }
+
+  // End the compressed stream
+  bool should_retry = false;
+  do {
+    int64_t output_len = compressed.size() - compressed_size;
+    uint8_t* output = compressed.data() + compressed_size;
+    int64_t bytes_written;
+    ASSERT_OK(compressor->End(output_len, output, &bytes_written, 
&should_retry));
+    ASSERT_LE(bytes_written, output_len);
+    compressed_size += bytes_written;
+    if (should_retry) {
+      compressed.resize(compressed.capacity() * 2);
+    }
+  } while (should_retry);
+
+  // Check decompressing the compressed data
+  std::vector<uint8_t> decompressed(data.size());
+  ASSERT_OK(codec->Decompress(compressed_size, compressed.data(), 
decompressed.size(),
+                              decompressed.data()));
+
+  ASSERT_EQ(data, decompressed);
+}
+
+// Check the streaming decompressor against one-shot compression
+
+void CheckStreamingDecompressor(Codec* codec, const vector<uint8_t>& data) {
+  // Create compressed data
+  int64_t max_compressed_len = codec->MaxCompressedLen(data.size(), 
data.data());
+  std::vector<uint8_t> compressed(max_compressed_len);
+  int64_t compressed_size;
+  ASSERT_OK(codec->Compress(data.size(), data.data(), max_compressed_len,
+                            compressed.data(), &compressed_size));
+  compressed.resize(compressed_size);
+
+  // Run streaming decompression
+  std::shared_ptr<Decompressor> decompressor;
+  ASSERT_OK(codec->MakeDecompressor(&decompressor));
+
+  std::vector<uint8_t> decompressed;
+  int64_t decompressed_size = 0;
+  const uint8_t* input = compressed.data();
+  int64_t remaining = compressed.size();
+
+  decompressed.resize(10);
+  while (!decompressor->IsFinished()) {
+    // Feed a small amount each time
+    int64_t input_len = std::min(remaining, static_cast<int64_t>(23));
+    int64_t output_len = decompressed.size() - decompressed_size;
+    uint8_t* output = decompressed.data() + decompressed_size;
+    int64_t bytes_read, bytes_written;
+    bool need_more_output;
+    ASSERT_OK(decompressor->Decompress(input_len, input, output_len, output, 
&bytes_read,
+                                       &bytes_written, &need_more_output));
+    ASSERT_LE(bytes_read, input_len);
+    ASSERT_LE(bytes_written, output_len);
+    ASSERT_TRUE(need_more_output || bytes_written > 0 || bytes_read > 0)
+        << "Decompression not progressing anymore";
+    if (need_more_output) {
+      decompressed.resize(decompressed.capacity() * 2);
+    }
+    decompressed_size += bytes_written;
+    input += bytes_read;
+    remaining -= bytes_read;
+  }
+  ASSERT_TRUE(decompressor->IsFinished());
+  ASSERT_EQ(remaining, 0);
+
+  // Check the decompressed data
+  decompressed.resize(decompressed_size);
+  ASSERT_EQ(data.size(), decompressed_size);
+  ASSERT_EQ(data, decompressed);
+}
+
+// Check the streaming compressor and decompressor together
+
+void CheckStreamingRoundtrip(Codec* codec, const vector<uint8_t>& data) {
+  std::shared_ptr<Compressor> compressor;
+  std::shared_ptr<Decompressor> decompressor;
+  ASSERT_OK(codec->MakeCompressor(&compressor));
+  ASSERT_OK(codec->MakeDecompressor(&decompressor));
+
+  std::default_random_engine engine(42);
+  std::uniform_int_distribution<int> buf_size_distribution(10, 40);
+
+  auto make_buf_size = [&]() -> int64_t { return 
buf_size_distribution(engine); };
+
+  // Compress...
+
+  std::vector<uint8_t> compressed(1);
+  int64_t compressed_size = 0;
+  {
+    const uint8_t* input = data.data();
+    int64_t remaining = data.size();
+
+    while (remaining > 0) {
+      // Feed a varying amount each time
+      int64_t input_len = std::min(remaining, make_buf_size());
+      int64_t output_len = compressed.size() - compressed_size;
+      uint8_t* output = compressed.data() + compressed_size;
+      int64_t bytes_read, bytes_written;
+      ASSERT_OK(compressor->Compress(input_len, input, output_len, output, 
&bytes_read,
+                                     &bytes_written));
+      ASSERT_LE(bytes_read, input_len);
+      ASSERT_LE(bytes_written, output_len);
+      compressed_size += bytes_written;
+      input += bytes_read;
+      remaining -= bytes_read;
+      if (bytes_read == 0) {
+        compressed.resize(compressed.capacity() * 2);
+      }
+    }
+    // End the compressed stream
+    bool should_retry = false;
+    do {
+      int64_t output_len = compressed.size() - compressed_size;
+      uint8_t* output = compressed.data() + compressed_size;
+      int64_t bytes_written;
+      ASSERT_OK(compressor->End(output_len, output, &bytes_written, 
&should_retry));
+      ASSERT_LE(bytes_written, output_len);
+      compressed_size += bytes_written;
+      if (should_retry) {
+        compressed.resize(compressed.capacity() * 2);
+      }
+    } while (should_retry);
+
+    compressed.resize(compressed_size);
+  }
+
+  // Then decompress...
+
+  std::vector<uint8_t> decompressed(2);
+  int64_t decompressed_size = 0;
+  {
+    const uint8_t* input = compressed.data();
+    int64_t remaining = compressed.size();
+
+    while (!decompressor->IsFinished()) {
+      // Feed a varying amount each time
+      int64_t input_len = std::min(remaining, make_buf_size());
+      int64_t output_len = decompressed.size() - decompressed_size;
+      uint8_t* output = decompressed.data() + decompressed_size;
+      int64_t bytes_read, bytes_written;
+      bool need_more_output;
+      ASSERT_OK(decompressor->Decompress(input_len, input, output_len, output,
+                                         &bytes_read, &bytes_written, 
&need_more_output));
+      ASSERT_LE(bytes_read, input_len);
+      ASSERT_LE(bytes_written, output_len);
+      ASSERT_TRUE(need_more_output || bytes_written > 0 || bytes_read > 0)
+          << "Decompression not progressing anymore";
+      if (need_more_output) {
+        decompressed.resize(decompressed.capacity() * 2);
+      }
+      decompressed_size += bytes_written;
+      input += bytes_read;
+      remaining -= bytes_read;
+    }
+    ASSERT_EQ(remaining, 0);
+    decompressed.resize(decompressed_size);
+  }
+
+  ASSERT_EQ(data.size(), decompressed.size());
+  ASSERT_EQ(data, decompressed);
+}
+
+class CodecTest : public ::testing::TestWithParam<Compression::type> {
+ protected:
+  Compression::type GetCompression() { return GetParam(); }
+
+  std::unique_ptr<Codec> MakeCodec() {
+    std::unique_ptr<Codec> codec;
+    ABORT_NOT_OK(Codec::Create(GetCompression(), &codec));
+    return codec;
+  }
+};
+
+TEST_P(CodecTest, CodecRoundtrip) {
   int sizes[] = {0, 10000, 100000};
   for (int data_size : sizes) {
-    vector<uint8_t> data(data_size);
-    random_bytes(data_size, 1234, data.data());
-    CheckCodecRoundtrip<CODEC>(data);
+    vector<uint8_t> data = MakeRandomData(data_size);
+    CheckCodecRoundtrip(GetCompression(), data);
+
+    data = MakeCompressibleData(data_size);
+    CheckCodecRoundtrip(GetCompression(), data);
+  }
+}
+
+TEST_P(CodecTest, StreamingCompressor) {
+  if (GetCompression() == Compression::SNAPPY) {
+    // SKIP: snappy doesn't support streaming compression
+    return;
+  }
+  if (GetCompression() == Compression::LZ4) {
+    // SKIP: LZ4 streaming compression uses the LZ4 framing format,
+    // which must be tested against a streaming decompressor
+    return;
+  }
+
+  int sizes[] = {0, 10, 100000};
+  for (int data_size : sizes) {
+    auto codec = MakeCodec();
+
+    vector<uint8_t> data = MakeRandomData(data_size);
+    CheckStreamingCompressor(codec.get(), data);
+
+    data = MakeCompressibleData(data_size);
+    CheckStreamingCompressor(codec.get(), data);
+  }
+}
+
+TEST_P(CodecTest, StreamingDecompressor) {
+  if (GetCompression() == Compression::SNAPPY) {
+    // SKIP: snappy doesn't support streaming decompression
+    return;
+  }
+  if (GetCompression() == Compression::LZ4) {
+    // SKIP: LZ4 streaming decompression uses the LZ4 framing format,
+    // which must be tested against a streaming compressor
+    return;
+  }
+
+  int sizes[] = {0, 10, 100000};
+  for (int data_size : sizes) {
+    auto codec = MakeCodec();
+
+    vector<uint8_t> data = MakeRandomData(data_size);
+    CheckStreamingDecompressor(codec.get(), data);
+
+    data = MakeCompressibleData(data_size);
+    CheckStreamingDecompressor(codec.get(), data);
+  }
+}
+
+TEST_P(CodecTest, StreamingRoundtrip) {
+  if (GetCompression() == Compression::SNAPPY) {
+    // SKIP: snappy doesn't support streaming decompression
+    return;
+  }
+
+  int sizes[] = {0, 10, 100000};
+  for (int data_size : sizes) {
+    auto codec = MakeCodec();
+
+    vector<uint8_t> data = MakeRandomData(data_size);
+    CheckStreamingRoundtrip(codec.get(), data);
+
+    data = MakeCompressibleData(data_size);
+    CheckStreamingRoundtrip(codec.get(), data);
   }
 }
 
-TEST(TestCompressors, Snappy) { CheckCodec<Compression::SNAPPY>(); }
+INSTANTIATE_TEST_CASE_P(TestGZip, CodecTest, 
::testing::Values(Compression::GZIP));
 
-TEST(TestCompressors, Brotli) { CheckCodec<Compression::BROTLI>(); }
+INSTANTIATE_TEST_CASE_P(TestZSTD, CodecTest, 
::testing::Values(Compression::ZSTD));
 
-TEST(TestCompressors, GZip) { CheckCodec<Compression::GZIP>(); }
+INSTANTIATE_TEST_CASE_P(TestSnappy, CodecTest, 
::testing::Values(Compression::SNAPPY));
 
-TEST(TestCompressors, ZSTD) { CheckCodec<Compression::ZSTD>(); }
+INSTANTIATE_TEST_CASE_P(TestLZ4, CodecTest, 
::testing::Values(Compression::LZ4));
 
-TEST(TestCompressors, Lz4) { CheckCodec<Compression::LZ4>(); }
+INSTANTIATE_TEST_CASE_P(TestBrotli, CodecTest, 
::testing::Values(Compression::BROTLI));
 
 }  // namespace util
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/compression.cc 
b/cpp/src/arrow/util/compression.cc
index 459034a..8d4d838 100644
--- a/cpp/src/arrow/util/compression.cc
+++ b/cpp/src/arrow/util/compression.cc
@@ -44,6 +44,10 @@
 namespace arrow {
 namespace util {
 
+Compressor::~Compressor() {}
+
+Decompressor::~Decompressor() {}
+
 Codec::~Codec() {}
 
 Status Codec::Create(Compression::type codec_type, std::unique_ptr<Codec>* 
result) {
diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h
index 8c6d7b7..f34ae43 100644
--- a/cpp/src/arrow/util/compression.h
+++ b/cpp/src/arrow/util/compression.h
@@ -21,32 +21,112 @@
 #include <cstdint>
 #include <memory>
 
-#include "arrow/status.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
 
+class Status;
+
 struct Compression {
   enum type { UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO };
 };
 
 namespace util {
 
+/// \brief Streaming compressor interface
+///
+class ARROW_EXPORT Compressor {
+ public:
+  virtual ~Compressor();
+
+  /// \brief Compress some input.
+  ///
+  /// If bytes_read is 0 on return, then a larger output buffer should be 
supplied.
+  virtual Status Compress(int64_t input_len, const uint8_t* input, int64_t 
output_len,
+                          uint8_t* output, int64_t* bytes_read,
+                          int64_t* bytes_written) = 0;
+
+  /// \brief Flush part of the compressed output.
+  ///
+  /// If should_retry is true on return, Flush() should be called again
+  /// with a larger buffer.
+  virtual Status Flush(int64_t output_len, uint8_t* output, int64_t* 
bytes_written,
+                       bool* should_retry) = 0;
+
+  /// \brief End compressing, doing whatever is necessary to end the stream.
+  ///
+  /// If should_retry is true on return, End() should be called again
+  /// with a larger buffer.  Otherwise, the Compressor should not be used 
anymore.
+  ///
+  /// End() implies Flush().
+  virtual Status End(int64_t output_len, uint8_t* output, int64_t* 
bytes_written,
+                     bool* should_retry) = 0;
+
+  // XXX add methods for buffer size heuristics?
+};
+
+/// \brief Streaming decompressor interface
+///
+class ARROW_EXPORT Decompressor {
+ public:
+  virtual ~Decompressor();
+
+  /// \brief Decompress some input.
+  ///
+  /// If need_more_output is true on return, a larger output buffer needs
+  /// to be supplied.
+  /// XXX is need_more_output necessary? (Brotli?)
+  virtual Status Decompress(int64_t input_len, const uint8_t* input, int64_t 
output_len,
+                            uint8_t* output, int64_t* bytes_read, int64_t* 
bytes_written,
+                            bool* need_more_output) = 0;
+
+  /// \brief Return whether the compressed stream is finished.
+  ///
+  /// This is a heuristic.  If true is returned, then it is guaranteed
+  /// that the stream is finished.  If false is returned, however, it may
+  /// simply be that the underlying library isn't able to provide the 
information.
+  virtual bool IsFinished() = 0;
+
+  // XXX add methods for buffer size heuristics?
+};
+
 class ARROW_EXPORT Codec {
  public:
   virtual ~Codec();
 
   static Status Create(Compression::type codec, std::unique_ptr<Codec>* out);
 
+  /// \brief One-shot decompression function
+  ///
+  /// output_len must be correct and therefore be obtained in advance.
+  ///
+  /// \note One-shot decompression is not always compatible with streaming
+  /// compression.  Depending on the codec (e.g. LZ4), different formats may
+  /// be used.
   virtual Status Decompress(int64_t input_len, const uint8_t* input, int64_t 
output_len,
                             uint8_t* output_buffer) = 0;
 
+  /// \brief One-shot compression function
+  ///
+  /// output_buffer_len must first have been computed using MaxCompressedLen().
+  ///
+  /// \note One-shot compression is not always compatible with streaming
+  /// decompression.  Depending on the codec (e.g. LZ4), different formats may
+  /// be used.
   virtual Status Compress(int64_t input_len, const uint8_t* input,
                           int64_t output_buffer_len, uint8_t* output_buffer,
                           int64_t* output_length) = 0;
 
   virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) = 
0;
 
+  // XXX Should be able to choose compression level, or presets? ("fast", etc.)
+
+  /// \brief Create a streaming compressor instance
+  virtual Status MakeCompressor(std::shared_ptr<Compressor>* out) = 0;
+
+  /// \brief Create a streaming decompressor instance
+  virtual Status MakeDecompressor(std::shared_ptr<Decompressor>* out) = 0;
+
   virtual const char* name() const = 0;
 };
 
diff --git a/cpp/src/arrow/util/compression_brotli.cc 
b/cpp/src/arrow/util/compression_brotli.cc
index 3dcaf99..1b8ab85 100644
--- a/cpp/src/arrow/util/compression_brotli.cc
+++ b/cpp/src/arrow/util/compression_brotli.cc
@@ -19,19 +19,184 @@
 
 #include <cstddef>
 #include <cstdint>
+#include <sstream>
 
 #include <brotli/decode.h>
 #include <brotli/encode.h>
 #include <brotli/types.h>
 
 #include "arrow/status.h"
+#include "arrow/util/logging.h"
 #include "arrow/util/macros.h"
 
 namespace arrow {
 namespace util {
 
+// Brotli compression quality is max (11) by default, which is slow.
+// We use 8 as a default as it is the best trade-off for Parquet workload.
+constexpr int kBrotliDefaultCompressionLevel = 8;
+
 // ----------------------------------------------------------------------
-// Brotli implementation
+// Brotli decompressor implementation
+
+class BrotliDecompressor : public Decompressor {
+ public:
+  BrotliDecompressor() {}
+
+  ~BrotliDecompressor() override {
+    if (state_ != nullptr) {
+      BrotliDecoderDestroyInstance(state_);
+    }
+  }
+
+  Status Init() {
+    state_ = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
+    if (state_ == nullptr) {
+      return BrotliError("Brotli init failed");
+    }
+    return Status::OK();
+  }
+
+  Status Decompress(int64_t input_len, const uint8_t* input, int64_t 
output_len,
+                    uint8_t* output, int64_t* bytes_read, int64_t* 
bytes_written,
+                    bool* need_more_output) override {
+    auto avail_in = static_cast<size_t>(input_len);
+    auto avail_out = static_cast<size_t>(output_len);
+    BrotliDecoderResult ret;
+
+    ret = BrotliDecoderDecompressStream(state_, &avail_in, &input, &avail_out, 
&output,
+                                        nullptr /* total_out */);
+    if (ret == BROTLI_DECODER_RESULT_ERROR) {
+      return BrotliError(BrotliDecoderGetErrorCode(state_), "Brotli decompress 
failed: ");
+    }
+    *need_more_output = (ret == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT);
+    *bytes_read = static_cast<int64_t>(input_len - avail_in);
+    *bytes_written = static_cast<int64_t>(output_len - avail_out);
+    return Status::OK();
+  }
+
+  bool IsFinished() override { return BrotliDecoderIsFinished(state_); }
+
+ protected:
+  Status BrotliError(const char* msg) { return Status::IOError(msg); }
+
+  Status BrotliError(BrotliDecoderErrorCode code, const char* prefix_msg) {
+    std::stringstream ss;
+    ss << prefix_msg << BrotliDecoderErrorString(code);
+    return Status::IOError(ss.str());
+  }
+
+  BrotliDecoderState* state_ = nullptr;
+};
+
+// ----------------------------------------------------------------------
+// Brotli compressor implementation
+
+class BrotliCompressor : public Compressor {
+ public:
+  BrotliCompressor() {}
+
+  ~BrotliCompressor() override {
+    if (state_ != nullptr) {
+      BrotliEncoderDestroyInstance(state_);
+    }
+  }
+
+  Status Init() {
+    state_ = BrotliEncoderCreateInstance(nullptr, nullptr, nullptr);
+    if (state_ == nullptr) {
+      return BrotliError("Brotli init failed");
+    }
+    if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_QUALITY,
+                                   kBrotliDefaultCompressionLevel)) {
+      return BrotliError("Brotli set compression level failed");
+    }
+    return Status::OK();
+  }
+
+  Status Compress(int64_t input_len, const uint8_t* input, int64_t output_len,
+                  uint8_t* output, int64_t* bytes_read, int64_t* 
bytes_written) override;
+
+  Status Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+               bool* should_retry) override;
+
+  Status End(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+             bool* should_retry) override;
+
+ protected:
+  Status BrotliError(const char* msg) { return Status::IOError(msg); }
+
+  BrotliEncoderState* state_ = nullptr;
+};
+
+Status BrotliCompressor::Compress(int64_t input_len, const uint8_t* input,
+                                  int64_t output_len, uint8_t* output,
+                                  int64_t* bytes_read, int64_t* bytes_written) 
{
+  auto avail_in = static_cast<size_t>(input_len);
+  auto avail_out = static_cast<size_t>(output_len);
+  BROTLI_BOOL ret;
+
+  ret = BrotliEncoderCompressStream(state_, BROTLI_OPERATION_PROCESS, 
&avail_in, &input,
+                                    &avail_out, &output, nullptr /* total_out 
*/);
+  if (!ret) {
+    return BrotliError("Brotli compress failed");
+  }
+  *bytes_read = static_cast<int64_t>(input_len - avail_in);
+  *bytes_written = static_cast<int64_t>(output_len - avail_out);
+  return Status::OK();
+}
+
+Status BrotliCompressor::Flush(int64_t output_len, uint8_t* output,
+                               int64_t* bytes_written, bool* should_retry) {
+  size_t avail_in = 0;
+  const uint8_t* next_in = nullptr;
+  auto avail_out = static_cast<size_t>(output_len);
+  BROTLI_BOOL ret;
+
+  ret = BrotliEncoderCompressStream(state_, BROTLI_OPERATION_FLUSH, &avail_in, 
&next_in,
+                                    &avail_out, &output, nullptr /* total_out 
*/);
+  if (!ret) {
+    return BrotliError("Brotli flush failed");
+  }
+  *bytes_written = static_cast<int64_t>(output_len - avail_out);
+  *should_retry = !!BrotliEncoderHasMoreOutput(state_);
+  return Status::OK();
+}
+
+Status BrotliCompressor::End(int64_t output_len, uint8_t* output, int64_t* 
bytes_written,
+                             bool* should_retry) {
+  size_t avail_in = 0;
+  const uint8_t* next_in = nullptr;
+  auto avail_out = static_cast<size_t>(output_len);
+  BROTLI_BOOL ret;
+
+  ret = BrotliEncoderCompressStream(state_, BROTLI_OPERATION_FINISH, 
&avail_in, &next_in,
+                                    &avail_out, &output, nullptr /* total_out 
*/);
+  if (!ret) {
+    return BrotliError("Brotli end failed");
+  }
+  *bytes_written = static_cast<int64_t>(output_len - avail_out);
+  *should_retry = !!BrotliEncoderHasMoreOutput(state_);
+  DCHECK_EQ(*should_retry, !BrotliEncoderIsFinished(state_));
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// Brotli codec implementation
+
+Status BrotliCodec::MakeCompressor(std::shared_ptr<Compressor>* out) {
+  auto ptr = std::make_shared<BrotliCompressor>();
+  RETURN_NOT_OK(ptr->Init());
+  *out = ptr;
+  return Status::OK();
+}
+
+Status BrotliCodec::MakeDecompressor(std::shared_ptr<Decompressor>* out) {
+  auto ptr = std::make_shared<BrotliDecompressor>();
+  RETURN_NOT_OK(ptr->Init());
+  *out = ptr;
+  return Status::OK();
+}
 
 Status BrotliCodec::Decompress(int64_t input_len, const uint8_t* input,
                                int64_t output_len, uint8_t* output_buffer) {
@@ -52,10 +217,9 @@ Status BrotliCodec::Compress(int64_t input_len, const 
uint8_t* input,
                              int64_t output_buffer_len, uint8_t* output_buffer,
                              int64_t* output_length) {
   std::size_t output_len = output_buffer_len;
-  // TODO: Make quality configurable. We use 8 as a default as it is the best
-  //       trade-off for Parquet workload
-  if (BrotliEncoderCompress(8, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, 
input_len,
-                            input, &output_len, output_buffer) == 
BROTLI_FALSE) {
+  if (BrotliEncoderCompress(kBrotliDefaultCompressionLevel, 
BROTLI_DEFAULT_WINDOW,
+                            BROTLI_DEFAULT_MODE, input_len, input, &output_len,
+                            output_buffer) == BROTLI_FALSE) {
     return Status::IOError("Brotli compression failure.");
   }
   *output_length = output_len;
diff --git a/cpp/src/arrow/util/compression_brotli.h 
b/cpp/src/arrow/util/compression_brotli.h
index 23fb321..0b403ee 100644
--- a/cpp/src/arrow/util/compression_brotli.h
+++ b/cpp/src/arrow/util/compression_brotli.h
@@ -19,6 +19,7 @@
 #define ARROW_UTIL_COMPRESSION_BROTLI_H
 
 #include <cstdint>
+#include <memory>
 
 #include "arrow/status.h"
 #include "arrow/util/compression.h"
@@ -38,6 +39,10 @@ class ARROW_EXPORT BrotliCodec : public Codec {
 
   int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override;
 
+  Status MakeCompressor(std::shared_ptr<Compressor>* out) override;
+
+  Status MakeDecompressor(std::shared_ptr<Decompressor>* out) override;
+
   const char* name() const override { return "brotli"; }
 };
 
diff --git a/cpp/src/arrow/util/compression_lz4.cc 
b/cpp/src/arrow/util/compression_lz4.cc
index 58228f5..7ba1f12 100644
--- a/cpp/src/arrow/util/compression_lz4.cc
+++ b/cpp/src/arrow/util/compression_lz4.cc
@@ -18,17 +18,237 @@
 #include "arrow/util/compression_lz4.h"
 
 #include <cstdint>
+#include <sstream>
 
 #include <lz4.h>
+#include <lz4frame.h>
 
 #include "arrow/status.h"
+#include "arrow/util/logging.h"
 #include "arrow/util/macros.h"
 
 namespace arrow {
 namespace util {
 
 // ----------------------------------------------------------------------
-// Lz4 implementation
+// Lz4 decompressor implementation
+
+class LZ4Decompressor : public Decompressor {
+ public:
+  LZ4Decompressor() {}
+
+  ~LZ4Decompressor() override {
+    if (ctx_ != nullptr) {
+      ARROW_UNUSED(LZ4F_freeDecompressionContext(ctx_));
+    }
+  }
+
+  Status Init() {
+    LZ4F_errorCode_t ret;
+    finished_ = false;
+
+    ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION);
+    if (LZ4F_isError(ret)) {
+      return LZ4Error(ret, "LZ4 init failed: ");
+    } else {
+      return Status::OK();
+    }
+  }
+
+  Status Decompress(int64_t input_len, const uint8_t* input, int64_t 
output_len,
+                    uint8_t* output, int64_t* bytes_read, int64_t* 
bytes_written,
+                    bool* need_more_output) override {
+    auto src = input;
+    auto dst = output;
+    auto srcSize = static_cast<size_t>(input_len);
+    auto dstCapacity = static_cast<size_t>(output_len);
+    size_t ret;
+
+    ret = LZ4F_decompress(ctx_, dst, &dstCapacity, src, &srcSize, nullptr /* 
options */);
+    if (LZ4F_isError(ret)) {
+      return LZ4Error(ret, "LZ4 decompress failed: ");
+    }
+    *bytes_read = static_cast<int64_t>(srcSize);
+    *bytes_written = static_cast<int64_t>(dstCapacity);
+    *need_more_output = (*bytes_read == 0 && *bytes_written == 0);
+    finished_ = (ret == 0);
+    return Status::OK();
+  }
+
+  bool IsFinished() override { return finished_; }
+
+ protected:
+  Status LZ4Error(LZ4F_errorCode_t ret, const char* prefix_msg) {
+    std::stringstream ss;
+    ss << prefix_msg << LZ4F_getErrorName(ret);
+    return Status::IOError(ss.str());
+  }
+
+  LZ4F_dctx* ctx_ = nullptr;
+  bool finished_;
+};
+
+// ----------------------------------------------------------------------
+// Lz4 compressor implementation
+
+class LZ4Compressor : public Compressor {
+ public:
+  LZ4Compressor() {}
+
+  ~LZ4Compressor() override {
+    if (ctx_ != nullptr) {
+      ARROW_UNUSED(LZ4F_freeCompressionContext(ctx_));
+    }
+  }
+
+  Status Init() {
+    LZ4F_errorCode_t ret;
+    memset(&prefs_, 0, sizeof(prefs_));
+    first_time_ = true;
+
+    ret = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION);
+    if (LZ4F_isError(ret)) {
+      return LZ4Error(ret, "LZ4 init failed: ");
+    } else {
+      return Status::OK();
+    }
+  }
+
+  Status Compress(int64_t input_len, const uint8_t* input, int64_t output_len,
+                  uint8_t* output, int64_t* bytes_read, int64_t* 
bytes_written) override;
+
+  Status Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+               bool* should_retry) override;
+
+  Status End(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+             bool* should_retry) override;
+
+ protected:
+  Status LZ4Error(LZ4F_errorCode_t ret, const char* prefix_msg) {
+    std::stringstream ss;
+    ss << prefix_msg << LZ4F_getErrorName(ret);
+    return Status::IOError(ss.str());
+  }
+
+  LZ4F_cctx* ctx_ = nullptr;
+  LZ4F_preferences_t prefs_;
+  bool first_time_;
+};
+
+#define BEGIN_COMPRESS(dst, dstCapacity)                       \
+  if (first_time_) {                                           \
+    if (dstCapacity < LZ4F_HEADER_SIZE_MAX) {                  \
+      /* Output too small to write LZ4F header */              \
+      return Status::OK();                                     \
+    }                                                          \
+    ret = LZ4F_compressBegin(ctx_, dst, dstCapacity, &prefs_); \
+    if (LZ4F_isError(ret)) {                                   \
+      return LZ4Error(ret, "LZ4 compress begin failed: ");     \
+    }                                                          \
+    first_time_ = false;                                       \
+    dst += ret;                                                \
+    dstCapacity -= ret;                                        \
+    *bytes_written += static_cast<int64_t>(ret);               \
+  }
+
+Status LZ4Compressor::Compress(int64_t input_len, const uint8_t* input,
+                               int64_t output_len, uint8_t* output, int64_t* 
bytes_read,
+                               int64_t* bytes_written) {
+  auto src = input;
+  auto dst = output;
+  auto srcSize = static_cast<size_t>(input_len);
+  auto dstCapacity = static_cast<size_t>(output_len);
+  size_t ret;
+
+  *bytes_read = 0;
+  *bytes_written = 0;
+
+  BEGIN_COMPRESS(dst, dstCapacity);
+
+  if (dstCapacity < LZ4F_compressBound(srcSize, &prefs_)) {
+    // Output too small to compress into
+    return Status::OK();
+  }
+  ret = LZ4F_compressUpdate(ctx_, dst, dstCapacity, src, srcSize, nullptr /* 
options */);
+  if (LZ4F_isError(ret)) {
+    return LZ4Error(ret, "LZ4 compress update failed: ");
+  }
+  *bytes_read = input_len;
+  *bytes_written += static_cast<int64_t>(ret);
+  DCHECK_LE(*bytes_written, output_len);
+  return Status::OK();
+}
+
+Status LZ4Compressor::Flush(int64_t output_len, uint8_t* output, int64_t* 
bytes_written,
+                            bool* should_retry) {
+  auto dst = output;
+  auto dstCapacity = static_cast<size_t>(output_len);
+  size_t ret;
+
+  *bytes_written = 0;
+  *should_retry = true;
+
+  BEGIN_COMPRESS(dst, dstCapacity);
+
+  if (dstCapacity < LZ4F_compressBound(0, &prefs_)) {
+    // Output too small to flush into
+    return Status::OK();
+  }
+
+  ret = LZ4F_flush(ctx_, dst, dstCapacity, nullptr /* options */);
+  if (LZ4F_isError(ret)) {
+    return LZ4Error(ret, "LZ4 flush failed: ");
+  }
+  *bytes_written += static_cast<int64_t>(ret);
+  *should_retry = false;
+  DCHECK_LE(*bytes_written, output_len);
+  return Status::OK();
+}
+
+Status LZ4Compressor::End(int64_t output_len, uint8_t* output, int64_t* 
bytes_written,
+                          bool* should_retry) {
+  auto dst = output;
+  auto dstCapacity = static_cast<size_t>(output_len);
+  size_t ret;
+
+  *bytes_written = 0;
+  *should_retry = true;
+
+  BEGIN_COMPRESS(dst, dstCapacity);
+
+  if (dstCapacity < LZ4F_compressBound(0, &prefs_)) {
+    // Output too small to end frame into
+    return Status::OK();
+  }
+
+  ret = LZ4F_compressEnd(ctx_, dst, dstCapacity, nullptr /* options */);
+  if (LZ4F_isError(ret)) {
+    return LZ4Error(ret, "LZ4 end failed: ");
+  }
+  *bytes_written += static_cast<int64_t>(ret);
+  *should_retry = false;
+  DCHECK_LE(*bytes_written, output_len);
+  return Status::OK();
+}
+
+#undef BEGIN_COMPRESS
+
+// ----------------------------------------------------------------------
+// Lz4 codec implementation
+
+Status Lz4Codec::MakeCompressor(std::shared_ptr<Compressor>* out) {
+  auto ptr = std::make_shared<LZ4Compressor>();
+  RETURN_NOT_OK(ptr->Init());
+  *out = ptr;
+  return Status::OK();
+}
+
+Status Lz4Codec::MakeDecompressor(std::shared_ptr<Decompressor>* out) {
+  auto ptr = std::make_shared<LZ4Decompressor>();
+  RETURN_NOT_OK(ptr->Init());
+  *out = ptr;
+  return Status::OK();
+}
 
 Status Lz4Codec::Decompress(int64_t input_len, const uint8_t* input, int64_t 
output_len,
                             uint8_t* output_buffer) {
diff --git a/cpp/src/arrow/util/compression_lz4.h 
b/cpp/src/arrow/util/compression_lz4.h
index 2b7b999..8c4bcf5 100644
--- a/cpp/src/arrow/util/compression_lz4.h
+++ b/cpp/src/arrow/util/compression_lz4.h
@@ -19,6 +19,7 @@
 #define ARROW_UTIL_COMPRESSION_LZ4_H
 
 #include <cstdint>
+#include <memory>
 
 #include "arrow/status.h"
 #include "arrow/util/compression.h"
@@ -38,6 +39,10 @@ class ARROW_EXPORT Lz4Codec : public Codec {
 
   int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override;
 
+  Status MakeCompressor(std::shared_ptr<Compressor>* out) override;
+
+  Status MakeDecompressor(std::shared_ptr<Decompressor>* out) override;
+
   const char* name() const override { return "lz4"; }
 };
 
diff --git a/cpp/src/arrow/util/compression_snappy.cc 
b/cpp/src/arrow/util/compression_snappy.cc
index 0f58f18..cc55408 100644
--- a/cpp/src/arrow/util/compression_snappy.cc
+++ b/cpp/src/arrow/util/compression_snappy.cc
@@ -33,6 +33,14 @@ namespace util {
 // ----------------------------------------------------------------------
 // Snappy implementation
 
+Status SnappyCodec::MakeCompressor(std::shared_ptr<Compressor>* out) {
+  return Status::NotImplemented("Streaming compression unsupported with 
Snappy");
+}
+
+Status SnappyCodec::MakeDecompressor(std::shared_ptr<Decompressor>* out) {
+  return Status::NotImplemented("Streaming decompression unsupported with 
Snappy");
+}
+
 Status SnappyCodec::Decompress(int64_t input_len, const uint8_t* input,
                                int64_t ARROW_ARG_UNUSED(output_len),
                                uint8_t* output_buffer) {
diff --git a/cpp/src/arrow/util/compression_snappy.h 
b/cpp/src/arrow/util/compression_snappy.h
index fcbb689..8c25111 100644
--- a/cpp/src/arrow/util/compression_snappy.h
+++ b/cpp/src/arrow/util/compression_snappy.h
@@ -19,6 +19,7 @@
 #define ARROW_UTIL_COMPRESSION_SNAPPY_H
 
 #include <cstdint>
+#include <memory>
 
 #include "arrow/status.h"
 #include "arrow/util/compression.h"
@@ -37,6 +38,10 @@ class ARROW_EXPORT SnappyCodec : public Codec {
 
   int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override;
 
+  Status MakeCompressor(std::shared_ptr<Compressor>* out);
+
+  Status MakeDecompressor(std::shared_ptr<Decompressor>* out) override;
+
   const char* name() const override { return "snappy"; }
 };
 
diff --git a/cpp/src/arrow/util/compression_zlib.cc 
b/cpp/src/arrow/util/compression_zlib.cc
index 883b8fe..cb3baff 100644
--- a/cpp/src/arrow/util/compression_zlib.cc
+++ b/cpp/src/arrow/util/compression_zlib.cc
@@ -17,8 +17,10 @@
 
 #include "arrow/util/compression_zlib.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <cstring>
+#include <limits>
 #include <memory>
 #include <sstream>
 #include <string>
@@ -33,6 +35,8 @@
 namespace arrow {
 namespace util {
 
+constexpr int kGZipDefaultCompressionLevel = 9;
+
 // ----------------------------------------------------------------------
 // gzip implementation
 
@@ -48,6 +52,263 @@ static constexpr int GZIP_CODEC = 16;
 // Determine if this is libz or gzip from header.
 static constexpr int DETECT_CODEC = 32;
 
+static int CompressionWindowBitsForFormat(GZipCodec::Format format) {
+  int window_bits = WINDOW_BITS;
+  switch (format) {
+    case GZipCodec::DEFLATE:
+      window_bits = -window_bits;
+      break;
+    case GZipCodec::GZIP:
+      window_bits += GZIP_CODEC;
+      break;
+    case GZipCodec::ZLIB:
+      break;
+  }
+  return window_bits;
+}
+
+static int DecompressionWindowBitsForFormat(GZipCodec::Format format) {
+  if (format == GZipCodec::DEFLATE) {
+    return -WINDOW_BITS;
+  } else {
+    /* If not deflate, autodetect format from header */
+    return WINDOW_BITS | DETECT_CODEC;
+  }
+}
+
+// ----------------------------------------------------------------------
+// gzip decompressor implementation
+
+class GZipDecompressor : public Decompressor {
+ public:
+  GZipDecompressor() : initialized_(false) {}
+
+  ~GZipDecompressor() override {
+    if (initialized_) {
+      inflateEnd(&stream_);
+    }
+  }
+
+  Status Init(GZipCodec::Format format) {
+    DCHECK(!initialized_);
+    memset(&stream_, 0, sizeof(stream_));
+    finished_ = false;
+
+    int ret;
+    int window_bits = DecompressionWindowBitsForFormat(format);
+    if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
+      return ZlibError("zlib inflateInit failed: ");
+    } else {
+      initialized_ = true;
+      return Status::OK();
+    }
+  }
+
+  Status Decompress(int64_t input_len, const uint8_t* input, int64_t 
output_len,
+                    uint8_t* output, int64_t* bytes_read, int64_t* 
bytes_written,
+                    bool* need_more_output) override {
+    static constexpr auto input_limit =
+        static_cast<int64_t>(std::numeric_limits<uInt>::max());
+    stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const 
Bytef*>(input));
+    stream_.avail_in = static_cast<uInt>(std::min(input_len, input_limit));
+    stream_.next_out = reinterpret_cast<Bytef*>(output);
+    stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
+    int ret;
+
+    ret = inflate(&stream_, Z_SYNC_FLUSH);
+    if (ret == Z_DATA_ERROR || ret == Z_STREAM_ERROR || ret == Z_MEM_ERROR) {
+      return ZlibError("zlib inflate failed: ");
+    }
+    if (ret == Z_NEED_DICT) {
+      return ZlibError("zlib inflate failed (need preset dictionary): ");
+    }
+    if (ret == Z_BUF_ERROR) {
+      // No progress was possible
+      *bytes_read = 0;
+      *bytes_written = 0;
+      *need_more_output = true;
+    } else {
+      DCHECK(ret == Z_OK || ret == Z_STREAM_END);
+      // Some progress has been made
+      *bytes_read = input_len - stream_.avail_in;
+      *bytes_written = output_len - stream_.avail_out;
+      *need_more_output = false;
+    }
+    finished_ = (ret == Z_STREAM_END);
+    return Status::OK();
+  }
+
+  bool IsFinished() override { return finished_; }
+
+ protected:
+  Status ZlibError(const char* prefix_msg) {
+    std::stringstream ss;
+    ss << prefix_msg;
+    if (stream_.msg && *stream_.msg) {
+      ss << stream_.msg;
+    } else {
+      ss << "(unknown error)";
+    }
+    return Status::IOError(ss.str());
+  }
+
+  z_stream stream_;
+  bool initialized_;
+  bool finished_;
+};
+
+// ----------------------------------------------------------------------
+// gzip compressor implementation
+
+class GZipCompressor : public Compressor {
+ public:
+  GZipCompressor() : initialized_(false) {}
+
+  ~GZipCompressor() override {
+    if (initialized_) {
+      deflateEnd(&stream_);
+    }
+  }
+
+  Status Init(GZipCodec::Format format) {
+    DCHECK(!initialized_);
+    memset(&stream_, 0, sizeof(stream_));
+
+    int ret;
+    // Initialize to run specified format
+    int window_bits = CompressionWindowBitsForFormat(format);
+    if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 
window_bits,
+                            kGZipDefaultCompressionLevel, Z_DEFAULT_STRATEGY)) 
!= Z_OK) {
+      return ZlibError("zlib deflateInit failed: ");
+    } else {
+      initialized_ = true;
+      return Status::OK();
+    }
+  }
+
+  Status Compress(int64_t input_len, const uint8_t* input, int64_t output_len,
+                  uint8_t* output, int64_t* bytes_read, int64_t* 
bytes_written) override;
+
+  Status Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+               bool* should_retry) override;
+
+  Status End(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+             bool* should_retry) override;
+
+ protected:
+  Status ZlibError(const char* prefix_msg) {
+    std::stringstream ss;
+    ss << prefix_msg;
+    if (stream_.msg && *stream_.msg) {
+      ss << stream_.msg;
+    } else {
+      ss << "(unknown error)";
+    }
+    return Status::IOError(ss.str());
+  }
+
+  z_stream stream_;
+  bool initialized_;
+};
+
+Status GZipCompressor::Compress(int64_t input_len, const uint8_t* input,
+                                int64_t output_len, uint8_t* output, int64_t* 
bytes_read,
+                                int64_t* bytes_written) {
+  DCHECK(initialized_) << "Called on non-initialized stream";
+
+  static constexpr auto input_limit =
+      static_cast<int64_t>(std::numeric_limits<uInt>::max());
+
+  stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
+  stream_.avail_in = static_cast<uInt>(std::min(input_len, input_limit));
+  stream_.next_out = reinterpret_cast<Bytef*>(output);
+  stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
+
+  int64_t ret = 0;
+  ret = deflate(&stream_, Z_NO_FLUSH);
+  if (ret == Z_STREAM_ERROR) {
+    return ZlibError("zlib compress failed: ");
+  }
+  if (ret == Z_OK) {
+    // Some progress has been made
+    *bytes_read = input_len - stream_.avail_in;
+    *bytes_written = output_len - stream_.avail_out;
+  } else {
+    // No progress was possible
+    DCHECK_EQ(ret, Z_BUF_ERROR);
+    *bytes_read = 0;
+    *bytes_written = 0;
+  }
+  return Status::OK();
+}
+
+Status GZipCompressor::Flush(int64_t output_len, uint8_t* output, int64_t* 
bytes_written,
+                             bool* should_retry) {
+  DCHECK(initialized_) << "Called on non-initialized stream";
+
+  static constexpr auto input_limit =
+      static_cast<int64_t>(std::numeric_limits<uInt>::max());
+
+  stream_.avail_in = 0;
+  stream_.next_out = reinterpret_cast<Bytef*>(output);
+  stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
+
+  int64_t ret = 0;
+  ret = deflate(&stream_, Z_SYNC_FLUSH);
+  if (ret == Z_STREAM_ERROR) {
+    return ZlibError("zlib flush failed: ");
+  }
+  if (ret == Z_OK) {
+    *bytes_written = output_len - stream_.avail_out;
+  } else {
+    DCHECK_EQ(ret, Z_BUF_ERROR);
+    *bytes_written = 0;
+  }
+  // "If deflate returns with avail_out == 0, this function must be called
+  //  again with the same value of the flush parameter and more output space
+  //  (updated avail_out), until the flush is complete (deflate returns
+  //  with non-zero avail_out)."
+  *should_retry = (*bytes_written == 0);
+  return Status::OK();
+}
+
+Status GZipCompressor::End(int64_t output_len, uint8_t* output, int64_t* 
bytes_written,
+                           bool* should_retry) {
+  DCHECK(initialized_) << "Called on non-initialized stream";
+
+  static constexpr auto input_limit =
+      static_cast<int64_t>(std::numeric_limits<uInt>::max());
+
+  stream_.avail_in = 0;
+  stream_.next_out = reinterpret_cast<Bytef*>(output);
+  stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
+
+  int64_t ret = 0;
+  ret = deflate(&stream_, Z_FINISH);
+  if (ret == Z_STREAM_ERROR) {
+    return ZlibError("zlib flush failed: ");
+  }
+  *bytes_written = output_len - stream_.avail_out;
+  if (ret == Z_STREAM_END) {
+    // Flush complete, we can now end the stream
+    *should_retry = false;
+    initialized_ = false;
+    ret = deflateEnd(&stream_);
+    if (ret == Z_OK) {
+      return Status::OK();
+    } else {
+      return ZlibError("zlib end failed: ");
+    }
+  } else {
+    // Not everything could be flushed,
+    *should_retry = true;
+    return Status::OK();
+  }
+}
+
+// ----------------------------------------------------------------------
+// gzip codec implementation
+
 class GZipCodec::GZipCodecImpl {
  public:
   explicit GZipCodecImpl(GZipCodec::Format format)
@@ -60,20 +321,29 @@ class GZipCodec::GZipCodecImpl {
     EndDecompressor();
   }
 
+  Status MakeCompressor(std::shared_ptr<Compressor>* out) {
+    auto ptr = std::make_shared<GZipCompressor>();
+    RETURN_NOT_OK(ptr->Init(format_));
+    *out = ptr;
+    return Status::OK();
+  }
+
+  Status MakeDecompressor(std::shared_ptr<Decompressor>* out) {
+    auto ptr = std::make_shared<GZipDecompressor>();
+    RETURN_NOT_OK(ptr->Init(format_));
+    *out = ptr;
+    return Status::OK();
+  }
+
   Status InitCompressor() {
     EndDecompressor();
     memset(&stream_, 0, sizeof(stream_));
 
     int ret;
     // Initialize to run specified format
-    int window_bits = WINDOW_BITS;
-    if (format_ == DEFLATE) {
-      window_bits = -window_bits;
-    } else if (format_ == GZIP) {
-      window_bits += GZIP_CODEC;
-    }
-    if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 
window_bits, 9,
-                            Z_DEFAULT_STRATEGY)) != Z_OK) {
+    int window_bits = CompressionWindowBitsForFormat(format_);
+    if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 
window_bits,
+                            kGZipDefaultCompressionLevel, Z_DEFAULT_STRATEGY)) 
!= Z_OK) {
       std::stringstream ss;
       ss << "zlib deflateInit failed: " << std::string(stream_.msg);
       return Status::IOError(ss.str());
@@ -95,7 +365,7 @@ class GZipCodec::GZipCodecImpl {
     int ret;
 
     // Initialize to run either deflate or zlib/gzip format
-    int window_bits = format_ == DEFLATE ? -WINDOW_BITS : WINDOW_BITS | 
DETECT_CODEC;
+    int window_bits = DecompressionWindowBitsForFormat(format_);
     if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
       std::stringstream ss;
       ss << "zlib inflateInit failed: " << std::string(stream_.msg);
@@ -249,6 +519,14 @@ Status GZipCodec::Compress(int64_t input_length, const 
uint8_t* input,
   return impl_->Compress(input_length, input, output_buffer_len, output, 
output_length);
 }
 
+Status GZipCodec::MakeCompressor(std::shared_ptr<Compressor>* out) {
+  return impl_->MakeCompressor(out);
+}
+
+Status GZipCodec::MakeDecompressor(std::shared_ptr<Decompressor>* out) {
+  return impl_->MakeDecompressor(out);
+}
+
 const char* GZipCodec::name() const { return "gzip"; }
 
 }  // namespace util
diff --git a/cpp/src/arrow/util/compression_zlib.h 
b/cpp/src/arrow/util/compression_zlib.h
index 1e66728..f934198 100644
--- a/cpp/src/arrow/util/compression_zlib.h
+++ b/cpp/src/arrow/util/compression_zlib.h
@@ -49,6 +49,10 @@ class ARROW_EXPORT GZipCodec : public Codec {
 
   int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override;
 
+  Status MakeCompressor(std::shared_ptr<Compressor>* out) override;
+
+  Status MakeDecompressor(std::shared_ptr<Decompressor>* out) override;
+
   const char* name() const override;
 
  private:
diff --git a/cpp/src/arrow/util/compression_zstd.cc 
b/cpp/src/arrow/util/compression_zstd.cc
index 4b9feee..4064f29 100644
--- a/cpp/src/arrow/util/compression_zstd.cc
+++ b/cpp/src/arrow/util/compression_zstd.cc
@@ -19,6 +19,7 @@
 
 #include <cstddef>
 #include <cstdint>
+#include <sstream>
 
 #include <zstd.h>
 
@@ -30,8 +31,178 @@ using std::size_t;
 namespace arrow {
 namespace util {
 
+// XXX level = 1 probably doesn't compress very much
+constexpr int kZSTDDefaultCompressionLevel = 1;
+
+// ----------------------------------------------------------------------
+// ZSTD decompressor implementation
+
+class ZSTDDecompressor : public Decompressor {
+ public:
+  ZSTDDecompressor() : stream_(ZSTD_createDStream()) {}
+
+  ~ZSTDDecompressor() override { ZSTD_freeDStream(stream_); }
+
+  Status Init() {
+    finished_ = false;
+    size_t ret = ZSTD_initDStream(stream_);
+    if (ZSTD_isError(ret)) {
+      return ZSTDError(ret, "zstd init failed: ");
+    } else {
+      return Status::OK();
+    }
+  }
+
+  Status Decompress(int64_t input_len, const uint8_t* input, int64_t 
output_len,
+                    uint8_t* output, int64_t* bytes_read, int64_t* 
bytes_written,
+                    bool* need_more_output) override {
+    ZSTD_inBuffer in_buf;
+    ZSTD_outBuffer out_buf;
+
+    in_buf.src = input;
+    in_buf.size = static_cast<size_t>(input_len);
+    in_buf.pos = 0;
+    out_buf.dst = output;
+    out_buf.size = static_cast<size_t>(output_len);
+    out_buf.pos = 0;
+
+    size_t ret;
+    ret = ZSTD_decompressStream(stream_, &out_buf, &in_buf);
+    if (ZSTD_isError(ret)) {
+      return ZSTDError(ret, "zstd decompress failed: ");
+    }
+    *bytes_read = static_cast<int64_t>(in_buf.pos);
+    *bytes_written = static_cast<int64_t>(out_buf.pos);
+    *need_more_output = *bytes_read == 0 && *bytes_written == 0;
+    finished_ = (ret == 0);
+    return Status::OK();
+  }
+
+  bool IsFinished() override { return finished_; }
+
+ protected:
+  Status ZSTDError(size_t ret, const char* prefix_msg) {
+    std::stringstream ss;
+    ss << prefix_msg << ZSTD_getErrorName(ret);
+    return Status::IOError(ss.str());
+  }
+
+  ZSTD_DStream* stream_;
+  bool finished_;
+};
+
 // ----------------------------------------------------------------------
-// ZSTD implementation
+// ZSTD compressor implementation
+
+class ZSTDCompressor : public Compressor {
+ public:
+  ZSTDCompressor() : stream_(ZSTD_createCStream()) {}
+
+  ~ZSTDCompressor() override { ZSTD_freeCStream(stream_); }
+
+  Status Init() {
+    size_t ret = ZSTD_initCStream(stream_, kZSTDDefaultCompressionLevel);
+    if (ZSTD_isError(ret)) {
+      return ZSTDError(ret, "zstd init failed: ");
+    } else {
+      return Status::OK();
+    }
+  }
+
+  Status Compress(int64_t input_len, const uint8_t* input, int64_t output_len,
+                  uint8_t* output, int64_t* bytes_read, int64_t* 
bytes_written) override;
+
+  Status Flush(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+               bool* should_retry) override;
+
+  Status End(int64_t output_len, uint8_t* output, int64_t* bytes_written,
+             bool* should_retry) override;
+
+ protected:
+  Status ZSTDError(size_t ret, const char* prefix_msg) {
+    std::stringstream ss;
+    ss << prefix_msg << ZSTD_getErrorName(ret);
+    return Status::IOError(ss.str());
+  }
+
+  ZSTD_CStream* stream_;
+};
+
+Status ZSTDCompressor::Compress(int64_t input_len, const uint8_t* input,
+                                int64_t output_len, uint8_t* output, int64_t* 
bytes_read,
+                                int64_t* bytes_written) {
+  ZSTD_inBuffer in_buf;
+  ZSTD_outBuffer out_buf;
+
+  in_buf.src = input;
+  in_buf.size = static_cast<size_t>(input_len);
+  in_buf.pos = 0;
+  out_buf.dst = output;
+  out_buf.size = static_cast<size_t>(output_len);
+  out_buf.pos = 0;
+
+  size_t ret;
+  ret = ZSTD_compressStream(stream_, &out_buf, &in_buf);
+  if (ZSTD_isError(ret)) {
+    return ZSTDError(ret, "zstd compress failed: ");
+  }
+  *bytes_read = static_cast<int64_t>(in_buf.pos);
+  *bytes_written = static_cast<int64_t>(out_buf.pos);
+  return Status::OK();
+}
+
+Status ZSTDCompressor::Flush(int64_t output_len, uint8_t* output, int64_t* 
bytes_written,
+                             bool* should_retry) {
+  ZSTD_outBuffer out_buf;
+
+  out_buf.dst = output;
+  out_buf.size = static_cast<size_t>(output_len);
+  out_buf.pos = 0;
+
+  size_t ret;
+  ret = ZSTD_flushStream(stream_, &out_buf);
+  if (ZSTD_isError(ret)) {
+    return ZSTDError(ret, "zstd flush failed: ");
+  }
+  *bytes_written = static_cast<int64_t>(out_buf.pos);
+  *should_retry = ret > 0;
+  return Status::OK();
+}
+
+Status ZSTDCompressor::End(int64_t output_len, uint8_t* output, int64_t* 
bytes_written,
+                           bool* should_retry) {
+  ZSTD_outBuffer out_buf;
+
+  out_buf.dst = output;
+  out_buf.size = static_cast<size_t>(output_len);
+  out_buf.pos = 0;
+
+  size_t ret;
+  ret = ZSTD_endStream(stream_, &out_buf);
+  if (ZSTD_isError(ret)) {
+    return ZSTDError(ret, "zstd end failed: ");
+  }
+  *bytes_written = static_cast<int64_t>(out_buf.pos);
+  *should_retry = ret > 0;
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// ZSTD codec implementation
+
+Status ZSTDCodec::MakeCompressor(std::shared_ptr<Compressor>* out) {
+  auto ptr = std::make_shared<ZSTDCompressor>();
+  RETURN_NOT_OK(ptr->Init());
+  *out = ptr;
+  return Status::OK();
+}
+
+Status ZSTDCodec::MakeDecompressor(std::shared_ptr<Decompressor>* out) {
+  auto ptr = std::make_shared<ZSTDDecompressor>();
+  RETURN_NOT_OK(ptr->Init());
+  *out = ptr;
+  return Status::OK();
+}
 
 Status ZSTDCodec::Decompress(int64_t input_len, const uint8_t* input, int64_t 
output_len,
                              uint8_t* output_buffer) {
@@ -52,8 +223,9 @@ int64_t ZSTDCodec::MaxCompressedLen(int64_t input_len,
 Status ZSTDCodec::Compress(int64_t input_len, const uint8_t* input,
                            int64_t output_buffer_len, uint8_t* output_buffer,
                            int64_t* output_length) {
-  *output_length = ZSTD_compress(output_buffer, 
static_cast<size_t>(output_buffer_len),
-                                 input, static_cast<size_t>(input_len), 1);
+  *output_length =
+      ZSTD_compress(output_buffer, static_cast<size_t>(output_buffer_len), 
input,
+                    static_cast<size_t>(input_len), 
kZSTDDefaultCompressionLevel);
   if (ZSTD_isError(*output_length)) {
     return Status::IOError("ZSTD compression failure.");
   }
diff --git a/cpp/src/arrow/util/compression_zstd.h 
b/cpp/src/arrow/util/compression_zstd.h
index 8ebfc2a..06da152 100644
--- a/cpp/src/arrow/util/compression_zstd.h
+++ b/cpp/src/arrow/util/compression_zstd.h
@@ -19,6 +19,7 @@
 #define ARROW_UTIL_COMPRESSION_ZSTD_H
 
 #include <cstdint>
+#include <memory>
 
 #include "arrow/status.h"
 #include "arrow/util/compression.h"
@@ -38,6 +39,10 @@ class ARROW_EXPORT ZSTDCodec : public Codec {
 
   int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override;
 
+  Status MakeCompressor(std::shared_ptr<Compressor>* out) override;
+
+  Status MakeDecompressor(std::shared_ptr<Decompressor>* out) override;
+
   const char* name() const override { return "zstd"; }
 };
 

Reply via email to