This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new eddd363 ARROW-1696: [C++] Add (de)compression benchmarks
eddd363 is described below
commit eddd363782de9c85ce28a61759f3293c34f21262
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Oct 16 22:28:52 2018 -0400
ARROW-1696: [C++] Add (de)compression benchmarks
The benchmarks don't use realistic data, so the numbers should be taken
with a pinch of salt.
Example output:
```
----------------------------------------------------------------------------------------------------------------
Benchmark Time
CPU Iterations UserCounters...
----------------------------------------------------------------------------------------------------------------
BM_StreamingCompression<Compression::GZIP>/repeats:1 265 ms
265 ms 3 ratio=6.95171 30.2214MB/s
BM_StreamingCompression<Compression::BROTLI>/repeats:1 441 ms
441 ms 2 ratio=8.31298 18.1418MB/s
BM_StreamingCompression<Compression::ZSTD>/repeats:1 27 ms
27 ms 26 ratio=7.07352 296.484MB/s
BM_StreamingCompression<Compression::LZ4>/repeats:1 19 ms
19 ms 36 ratio=3.52726 417.971MB/s
BM_StreamingDecompression<Compression::GZIP>/repeats:1 21 ms
21 ms 33 ratio=6.95171 379.959MB/s
BM_StreamingDecompression<Compression::BROTLI>/repeats:1 14 ms
14 ms 50 ratio=8.31298 579.888MB/s
BM_StreamingDecompression<Compression::ZSTD>/repeats:1 13 ms
13 ms 56 ratio=7.07352 634.179MB/s
BM_StreamingDecompression<Compression::LZ4>/repeats:1 4 ms
4 ms 175 ratio=3.52726 1.95598GB/s
```
Author: Antoine Pitrou <[email protected]>
Closes #2762 from pitrou/ARROW-1696-compression-benchmarks and squashes the
following commits:
e50eb65e0 <Antoine Pitrou> Try to fix conversion warnings (again)
10f62c4e3 <Antoine Pitrou> Try to fix conversion errors
77a893a01 <Antoine Pitrou> ARROW-1696: Add (de)compression benchmarks
---
cpp/src/arrow/util/CMakeLists.txt | 1 +
cpp/src/arrow/util/compression-benchmark.cc | 210 ++++++++++++++++++++++++++++
2 files changed, 211 insertions(+)
diff --git a/cpp/src/arrow/util/CMakeLists.txt
b/cpp/src/arrow/util/CMakeLists.txt
index c8f7316..69e18a6 100644
--- a/cpp/src/arrow/util/CMakeLists.txt
+++ b/cpp/src/arrow/util/CMakeLists.txt
@@ -95,6 +95,7 @@ ADD_ARROW_TEST(lazy-test)
ADD_ARROW_TEST(logging-test)
ADD_ARROW_BENCHMARK(bit-util-benchmark)
+ADD_ARROW_BENCHMARK(compression-benchmark)
ADD_ARROW_BENCHMARK(decimal-benchmark)
ADD_ARROW_BENCHMARK(lazy-benchmark)
ADD_ARROW_BENCHMARK(number-parsing-benchmark)
diff --git a/cpp/src/arrow/util/compression-benchmark.cc
b/cpp/src/arrow/util/compression-benchmark.cc
new file mode 100644
index 0000000..715c257
--- /dev/null
+++ b/cpp/src/arrow/util/compression-benchmark.cc
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <cstdint>
+#include <cstring>
+#include <random>
+#include <string>
+#include <vector>
+
+#include "arrow/test-util.h"
+#include "arrow/util/compression.h"
+
+namespace arrow {
+namespace util {
+
+std::vector<uint8_t> MakeCompressibleData(int data_size) {
+ // XXX This isn't a real-world corpus so doesn't really represent the
+ // comparative qualities of the algorithms
+
+ // First make highly compressible data
+ std::string base_data =
+ "Apache Arrow is a cross-language development platform for in-memory
data";
+ int nrepeats = static_cast<int>(1 + data_size / base_data.size());
+
+ std::vector<uint8_t> data(base_data.size() * nrepeats);
+ for (int i = 0; i < nrepeats; ++i) {
+ std::memcpy(data.data() + i * base_data.size(), base_data.data(),
base_data.size());
+ }
+ data.resize(data_size);
+
+ // Then randomly mutate some bytes so as to make things harder
+ std::mt19937 engine(42);
+ std::exponential_distribution<> offsets(0.05);
+ std::uniform_int_distribution<> values(0, 255);
+
+ int64_t pos = 0;
+ while (pos < data_size) {
+ data[pos] = static_cast<uint8_t>(values(engine));
+ pos += static_cast<int64_t>(offsets(engine));
+ }
+
+ return data;
+}
+
+int64_t StreamingCompress(Codec* codec, const std::vector<uint8_t>& data,
+ std::vector<uint8_t>* compressed_data = nullptr) {
+ if (compressed_data != nullptr) {
+ compressed_data->clear();
+ compressed_data->shrink_to_fit();
+ }
+ std::shared_ptr<Compressor> compressor;
+ ABORT_NOT_OK(codec->MakeCompressor(&compressor));
+
+ const uint8_t* input = data.data();
+ int64_t input_len = data.size();
+ int64_t compressed_size = 0;
+
+ std::vector<uint8_t> output_buffer(1 << 20); // 1 MB
+
+ while (input_len > 0) {
+ int64_t bytes_read = 0, bytes_written = 0;
+ ABORT_NOT_OK(compressor->Compress(input_len, input, output_buffer.size(),
+ output_buffer.data(), &bytes_read,
&bytes_written));
+ input += bytes_read;
+ input_len -= bytes_read;
+ compressed_size += bytes_written;
+ if (compressed_data != nullptr && bytes_written > 0) {
+ compressed_data->resize(compressed_data->size() + bytes_written);
+ memcpy(compressed_data->data() + compressed_data->size() - bytes_written,
+ output_buffer.data(), bytes_written);
+ }
+ if (bytes_read == 0) {
+ // Need to enlarge output buffer
+ output_buffer.resize(output_buffer.size() * 2);
+ }
+ }
+ while (true) {
+ bool should_retry;
+ int64_t bytes_written;
+ ABORT_NOT_OK(compressor->End(output_buffer.size(), output_buffer.data(),
+ &bytes_written, &should_retry));
+ compressed_size += bytes_written;
+ if (compressed_data != nullptr && bytes_written > 0) {
+ compressed_data->resize(compressed_data->size() + bytes_written);
+ memcpy(compressed_data->data() + compressed_data->size() - bytes_written,
+ output_buffer.data(), bytes_written);
+ }
+ if (should_retry) {
+ // Need to enlarge output buffer
+ output_buffer.resize(output_buffer.size() * 2);
+ } else {
+ break;
+ }
+ }
+ return compressed_size;
+}
+
+static void BM_StreamingCompression(
+ Compression::type compression, const std::vector<uint8_t>& data,
+ benchmark::State& state) { // NOLINT non-const reference
+ std::unique_ptr<Codec> codec;
+ ABORT_NOT_OK(Codec::Create(compression, &codec));
+
+ while (state.KeepRunning()) {
+ int64_t compressed_size = StreamingCompress(codec.get(), data);
+ state.counters["ratio"] =
+ static_cast<double>(data.size()) /
static_cast<double>(compressed_size);
+ }
+ state.SetBytesProcessed(state.iterations() * data.size());
+}
+
+template <Compression::type COMPRESSION>
+static void BM_StreamingCompression(
+ benchmark::State& state) { // NOLINT non-const
reference
+ auto data = MakeCompressibleData(8 * 1024 * 1024); // 8 MB
+
+ BM_StreamingCompression(COMPRESSION, data, state);
+}
+
+static void BM_StreamingDecompression(
+ Compression::type compression, const std::vector<uint8_t>& data,
+ benchmark::State& state) { // NOLINT non-const reference
+ std::unique_ptr<Codec> codec;
+ ABORT_NOT_OK(Codec::Create(compression, &codec));
+
+ std::vector<uint8_t> compressed_data;
+ ARROW_UNUSED(StreamingCompress(codec.get(), data, &compressed_data));
+ state.counters["ratio"] =
+ static_cast<double>(data.size()) /
static_cast<double>(compressed_data.size());
+
+ while (state.KeepRunning()) {
+ std::shared_ptr<Decompressor> decompressor;
+ ABORT_NOT_OK(codec->MakeDecompressor(&decompressor));
+
+ const uint8_t* input = compressed_data.data();
+ int64_t input_len = compressed_data.size();
+ int64_t decompressed_size = 0;
+
+ std::vector<uint8_t> output_buffer(1 << 20); // 1 MB
+ while (!decompressor->IsFinished()) {
+ int64_t bytes_read, bytes_written;
+ bool need_more_output;
+ ABORT_NOT_OK(decompressor->Decompress(input_len, input,
output_buffer.size(),
+ output_buffer.data(), &bytes_read,
+ &bytes_written,
&need_more_output));
+ input += bytes_read;
+ input_len -= bytes_read;
+ decompressed_size += bytes_written;
+ if (need_more_output) {
+ // Enlarge output buffer
+ output_buffer.resize(output_buffer.size() * 2);
+ }
+ }
+ ARROW_CHECK(decompressed_size == static_cast<int64_t>(data.size()));
+ }
+ state.SetBytesProcessed(state.iterations() * data.size());
+}
+
+template <Compression::type COMPRESSION>
+static void BM_StreamingDecompression(
+ benchmark::State& state) { // NOLINT non-const
reference
+ auto data = MakeCompressibleData(8 * 1024 * 1024); // 8 MB
+
+ BM_StreamingDecompression(COMPRESSION, data, state);
+}
+
+BENCHMARK_TEMPLATE(BM_StreamingCompression, Compression::GZIP)
+ ->Unit(benchmark::kMillisecond)
+ ->Repetitions(1);
+BENCHMARK_TEMPLATE(BM_StreamingCompression, Compression::BROTLI)
+ ->Unit(benchmark::kMillisecond)
+ ->Repetitions(1);
+BENCHMARK_TEMPLATE(BM_StreamingCompression, Compression::ZSTD)
+ ->Unit(benchmark::kMillisecond)
+ ->Repetitions(1);
+BENCHMARK_TEMPLATE(BM_StreamingCompression, Compression::LZ4)
+ ->Unit(benchmark::kMillisecond)
+ ->Repetitions(1);
+
+BENCHMARK_TEMPLATE(BM_StreamingDecompression, Compression::GZIP)
+ ->Unit(benchmark::kMillisecond)
+ ->Repetitions(1);
+BENCHMARK_TEMPLATE(BM_StreamingDecompression, Compression::BROTLI)
+ ->Unit(benchmark::kMillisecond)
+ ->Repetitions(1);
+BENCHMARK_TEMPLATE(BM_StreamingDecompression, Compression::ZSTD)
+ ->Unit(benchmark::kMillisecond)
+ ->Repetitions(1);
+BENCHMARK_TEMPLATE(BM_StreamingDecompression, Compression::LZ4)
+ ->Unit(benchmark::kMillisecond)
+ ->Repetitions(1);
+
+} // namespace util
+} // namespace arrow