This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new eddd363  ARROW-1696: [C++] Add (de)compression benchmarks
eddd363 is described below

commit eddd363782de9c85ce28a61759f3293c34f21262
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Oct 16 22:28:52 2018 -0400

    ARROW-1696: [C++] Add (de)compression benchmarks
    
    The benchmarks don't use realistic data, so the numbers should be taken 
with a pinch of salt.
    
    Example output:
    ```
    
----------------------------------------------------------------------------------------------------------------
    Benchmark                                                         Time      
     CPU Iterations UserCounters...
    
----------------------------------------------------------------------------------------------------------------
    BM_StreamingCompression<Compression::GZIP>/repeats:1            265 ms      
  265 ms          3 ratio=6.95171   30.2214MB/s
    BM_StreamingCompression<Compression::BROTLI>/repeats:1          441 ms      
  441 ms          2 ratio=8.31298   18.1418MB/s
    BM_StreamingCompression<Compression::ZSTD>/repeats:1             27 ms      
   27 ms         26 ratio=7.07352   296.484MB/s
    BM_StreamingCompression<Compression::LZ4>/repeats:1              19 ms      
   19 ms         36 ratio=3.52726   417.971MB/s
    BM_StreamingDecompression<Compression::GZIP>/repeats:1           21 ms      
   21 ms         33 ratio=6.95171   379.959MB/s
    BM_StreamingDecompression<Compression::BROTLI>/repeats:1         14 ms      
   14 ms         50 ratio=8.31298   579.888MB/s
    BM_StreamingDecompression<Compression::ZSTD>/repeats:1           13 ms      
   13 ms         56 ratio=7.07352   634.179MB/s
    BM_StreamingDecompression<Compression::LZ4>/repeats:1             4 ms      
    4 ms        175 ratio=3.52726   1.95598GB/s
    ```
    
    Author: Antoine Pitrou <[email protected]>
    
    Closes #2762 from pitrou/ARROW-1696-compression-benchmarks and squashes the 
following commits:
    
    e50eb65e0 <Antoine Pitrou> Try to fix conversion warnings (again)
    10f62c4e3 <Antoine Pitrou> Try to fix conversion errors
    77a893a01 <Antoine Pitrou> ARROW-1696:  Add (de)compression benchmarks
---
 cpp/src/arrow/util/CMakeLists.txt           |   1 +
 cpp/src/arrow/util/compression-benchmark.cc | 210 ++++++++++++++++++++++++++++
 2 files changed, 211 insertions(+)

diff --git a/cpp/src/arrow/util/CMakeLists.txt 
b/cpp/src/arrow/util/CMakeLists.txt
index c8f7316..69e18a6 100644
--- a/cpp/src/arrow/util/CMakeLists.txt
+++ b/cpp/src/arrow/util/CMakeLists.txt
@@ -95,6 +95,7 @@ ADD_ARROW_TEST(lazy-test)
 ADD_ARROW_TEST(logging-test)
 
 ADD_ARROW_BENCHMARK(bit-util-benchmark)
+ADD_ARROW_BENCHMARK(compression-benchmark)
 ADD_ARROW_BENCHMARK(decimal-benchmark)
 ADD_ARROW_BENCHMARK(lazy-benchmark)
 ADD_ARROW_BENCHMARK(number-parsing-benchmark)
diff --git a/cpp/src/arrow/util/compression-benchmark.cc 
b/cpp/src/arrow/util/compression-benchmark.cc
new file mode 100644
index 0000000..715c257
--- /dev/null
+++ b/cpp/src/arrow/util/compression-benchmark.cc
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <cstdint>
+#include <cstring>
+#include <random>
+#include <string>
+#include <vector>
+
+#include "arrow/test-util.h"
+#include "arrow/util/compression.h"
+
+namespace arrow {
+namespace util {
+
+std::vector<uint8_t> MakeCompressibleData(int data_size) {
+  // XXX This isn't a real-world corpus so doesn't really represent the
+  // comparative qualities of the algorithms
+
+  // First make highly compressible data
+  std::string base_data =
+      "Apache Arrow is a cross-language development platform for in-memory 
data";
+  int nrepeats = static_cast<int>(1 + data_size / base_data.size());
+
+  std::vector<uint8_t> data(base_data.size() * nrepeats);
+  for (int i = 0; i < nrepeats; ++i) {
+    std::memcpy(data.data() + i * base_data.size(), base_data.data(), 
base_data.size());
+  }
+  data.resize(data_size);
+
+  // Then randomly mutate some bytes so as to make things harder
+  std::mt19937 engine(42);
+  std::exponential_distribution<> offsets(0.05);
+  std::uniform_int_distribution<> values(0, 255);
+
+  int64_t pos = 0;
+  while (pos < data_size) {
+    data[pos] = static_cast<uint8_t>(values(engine));
+    pos += static_cast<int64_t>(offsets(engine));
+  }
+
+  return data;
+}
+
+int64_t StreamingCompress(Codec* codec, const std::vector<uint8_t>& data,
+                          std::vector<uint8_t>* compressed_data = nullptr) {
+  if (compressed_data != nullptr) {
+    compressed_data->clear();
+    compressed_data->shrink_to_fit();
+  }
+  std::shared_ptr<Compressor> compressor;
+  ABORT_NOT_OK(codec->MakeCompressor(&compressor));
+
+  const uint8_t* input = data.data();
+  int64_t input_len = data.size();
+  int64_t compressed_size = 0;
+
+  std::vector<uint8_t> output_buffer(1 << 20);  // 1 MB
+
+  while (input_len > 0) {
+    int64_t bytes_read = 0, bytes_written = 0;
+    ABORT_NOT_OK(compressor->Compress(input_len, input, output_buffer.size(),
+                                      output_buffer.data(), &bytes_read, 
&bytes_written));
+    input += bytes_read;
+    input_len -= bytes_read;
+    compressed_size += bytes_written;
+    if (compressed_data != nullptr && bytes_written > 0) {
+      compressed_data->resize(compressed_data->size() + bytes_written);
+      memcpy(compressed_data->data() + compressed_data->size() - bytes_written,
+             output_buffer.data(), bytes_written);
+    }
+    if (bytes_read == 0) {
+      // Need to enlarge output buffer
+      output_buffer.resize(output_buffer.size() * 2);
+    }
+  }
+  while (true) {
+    bool should_retry;
+    int64_t bytes_written;
+    ABORT_NOT_OK(compressor->End(output_buffer.size(), output_buffer.data(),
+                                 &bytes_written, &should_retry));
+    compressed_size += bytes_written;
+    if (compressed_data != nullptr && bytes_written > 0) {
+      compressed_data->resize(compressed_data->size() + bytes_written);
+      memcpy(compressed_data->data() + compressed_data->size() - bytes_written,
+             output_buffer.data(), bytes_written);
+    }
+    if (should_retry) {
+      // Need to enlarge output buffer
+      output_buffer.resize(output_buffer.size() * 2);
+    } else {
+      break;
+    }
+  }
+  return compressed_size;
+}
+
+static void BM_StreamingCompression(
+    Compression::type compression, const std::vector<uint8_t>& data,
+    benchmark::State& state) {  // NOLINT non-const reference
+  std::unique_ptr<Codec> codec;
+  ABORT_NOT_OK(Codec::Create(compression, &codec));
+
+  while (state.KeepRunning()) {
+    int64_t compressed_size = StreamingCompress(codec.get(), data);
+    state.counters["ratio"] =
+        static_cast<double>(data.size()) / 
static_cast<double>(compressed_size);
+  }
+  state.SetBytesProcessed(state.iterations() * data.size());
+}
+
+template <Compression::type COMPRESSION>
+static void BM_StreamingCompression(
+    benchmark::State& state) {                        // NOLINT non-const 
reference
+  auto data = MakeCompressibleData(8 * 1024 * 1024);  // 8 MB
+
+  BM_StreamingCompression(COMPRESSION, data, state);
+}
+
+static void BM_StreamingDecompression(
+    Compression::type compression, const std::vector<uint8_t>& data,
+    benchmark::State& state) {  // NOLINT non-const reference
+  std::unique_ptr<Codec> codec;
+  ABORT_NOT_OK(Codec::Create(compression, &codec));
+
+  std::vector<uint8_t> compressed_data;
+  ARROW_UNUSED(StreamingCompress(codec.get(), data, &compressed_data));
+  state.counters["ratio"] =
+      static_cast<double>(data.size()) / 
static_cast<double>(compressed_data.size());
+
+  while (state.KeepRunning()) {
+    std::shared_ptr<Decompressor> decompressor;
+    ABORT_NOT_OK(codec->MakeDecompressor(&decompressor));
+
+    const uint8_t* input = compressed_data.data();
+    int64_t input_len = compressed_data.size();
+    int64_t decompressed_size = 0;
+
+    std::vector<uint8_t> output_buffer(1 << 20);  // 1 MB
+    while (!decompressor->IsFinished()) {
+      int64_t bytes_read, bytes_written;
+      bool need_more_output;
+      ABORT_NOT_OK(decompressor->Decompress(input_len, input, 
output_buffer.size(),
+                                            output_buffer.data(), &bytes_read,
+                                            &bytes_written, 
&need_more_output));
+      input += bytes_read;
+      input_len -= bytes_read;
+      decompressed_size += bytes_written;
+      if (need_more_output) {
+        // Enlarge output buffer
+        output_buffer.resize(output_buffer.size() * 2);
+      }
+    }
+    ARROW_CHECK(decompressed_size == static_cast<int64_t>(data.size()));
+  }
+  state.SetBytesProcessed(state.iterations() * data.size());
+}
+
+template <Compression::type COMPRESSION>
+static void BM_StreamingDecompression(
+    benchmark::State& state) {                        // NOLINT non-const 
reference
+  auto data = MakeCompressibleData(8 * 1024 * 1024);  // 8 MB
+
+  BM_StreamingDecompression(COMPRESSION, data, state);
+}
+
+BENCHMARK_TEMPLATE(BM_StreamingCompression, Compression::GZIP)
+    ->Unit(benchmark::kMillisecond)
+    ->Repetitions(1);
+BENCHMARK_TEMPLATE(BM_StreamingCompression, Compression::BROTLI)
+    ->Unit(benchmark::kMillisecond)
+    ->Repetitions(1);
+BENCHMARK_TEMPLATE(BM_StreamingCompression, Compression::ZSTD)
+    ->Unit(benchmark::kMillisecond)
+    ->Repetitions(1);
+BENCHMARK_TEMPLATE(BM_StreamingCompression, Compression::LZ4)
+    ->Unit(benchmark::kMillisecond)
+    ->Repetitions(1);
+
+BENCHMARK_TEMPLATE(BM_StreamingDecompression, Compression::GZIP)
+    ->Unit(benchmark::kMillisecond)
+    ->Repetitions(1);
+BENCHMARK_TEMPLATE(BM_StreamingDecompression, Compression::BROTLI)
+    ->Unit(benchmark::kMillisecond)
+    ->Repetitions(1);
+BENCHMARK_TEMPLATE(BM_StreamingDecompression, Compression::ZSTD)
+    ->Unit(benchmark::kMillisecond)
+    ->Repetitions(1);
+BENCHMARK_TEMPLATE(BM_StreamingDecompression, Compression::LZ4)
+    ->Unit(benchmark::kMillisecond)
+    ->Repetitions(1);
+
+}  // namespace util
+}  // namespace arrow

Reply via email to