This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new 63166a8 ORC-639: [C++] Improve zstd compression performance
63166a8 is described below
commit 63166a877d4c2e7bd19a88897450e72e5d1498c3
Author: Ion GaztaƱaga <[email protected]>
AuthorDate: Wed Jun 10 06:14:11 2020 +0200
ORC-639: [C++] Improve zstd compression performance
Zstd's "Simple API" is used which initializes the compression/decompression
context each time. This includes time to initialize all
compression/decompression tables. Using the "Explicit context" API improves the
performance significantly as compression/decompression tables are constructed
once and reused each time "decompress" or "doBlockCompression" is called. We've
noticed more than 15% time improvement in some applications with this change,
so it seems that any ORC user using Zstd [...]
This fixes #511
---
c++/src/Compression.cc | 86 +++++++++++++++++++++++++++++++++++++++++++-------
1 file changed, 75 insertions(+), 11 deletions(-)
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index 4f55821..c36eaeb 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -911,12 +911,16 @@ DIAGNOSTIC_POP
capacity,
blockSize,
pool) {
- // PASS
+ this->init();
}
virtual std::string getName() const override {
return "ZstdCompressionStream";
}
+
+ virtual ~ZSTDCompressionStream() override {
+ this->end();
+ }
protected:
virtual uint64_t doBlockCompression() override;
@@ -924,15 +928,43 @@ DIAGNOSTIC_POP
virtual uint64_t estimateMaxCompressionSize() override {
return ZSTD_compressBound(static_cast<size_t>(bufferSize));
}
+
+ private:
+ void init();
+ void end();
+ ZSTD_CCtx *cctx;
};
uint64_t ZSTDCompressionStream::doBlockCompression() {
- return ZSTD_compress(compressorBuffer.data(),
- compressorBuffer.size(),
- rawInputBuffer.data(),
- static_cast<size_t>(bufferSize),
- level);
+ return ZSTD_compressCCtx(cctx,
+ compressorBuffer.data(),
+ compressorBuffer.size(),
+ rawInputBuffer.data(),
+ static_cast<size_t>(bufferSize),
+ level);
}
+
+DIAGNOSTIC_PUSH
+
+#if defined(__GNUC__) || defined(__clang__)
+ DIAGNOSTIC_IGNORE("-Wold-style-cast")
+#endif
+
+ void ZSTDCompressionStream::init() {
+
+ cctx = ZSTD_createCCtx();
+ if (!cctx) {
+ throw std::runtime_error("Error while calling ZSTD_createCCtx() for
zstd.");
+ }
+ }
+
+
+ void ZSTDCompressionStream::end() {
+ (void)ZSTD_freeCCtx(cctx);
+ cctx = nullptr;
+ }
+
+DIAGNOSTIC_PUSH
/**
* ZSTD block decompression
@@ -945,7 +977,11 @@ DIAGNOSTIC_POP
: BlockDecompressionStream(std::move(inStream),
blockSize,
_pool) {
- // PASS
+ this->init();
+ }
+
+ virtual ~ZSTDDecompressionStream() override {
+ this->end();
}
std::string getName() const override {
@@ -959,18 +995,46 @@ DIAGNOSTIC_POP
uint64_t length,
char *output,
size_t maxOutputLength) override;
+
+ private:
+ void init();
+ void end();
+ ZSTD_DCtx *dctx;
};
uint64_t ZSTDDecompressionStream::decompress(const char *inputPtr,
uint64_t length,
char *output,
size_t maxOutputLength) {
- return static_cast<uint64_t>(ZSTD_decompress(output,
- maxOutputLength,
- inputPtr,
- length));
+ return static_cast<uint64_t>(ZSTD_decompressDCtx(dctx,
+ output,
+ maxOutputLength,
+ inputPtr,
+ length));
+ }
+
+DIAGNOSTIC_PUSH
+
+#if defined(__GNUC__) || defined(__clang__)
+ DIAGNOSTIC_IGNORE("-Wold-style-cast")
+#endif
+
+ void ZSTDDecompressionStream::init() {
+
+ dctx = ZSTD_createDCtx();
+ if (!dctx) {
+ throw std::runtime_error("Error while calling ZSTD_createDCtx() for
zstd.");
+ }
}
+
+ void ZSTDDecompressionStream::end() {
+ (void)ZSTD_freeDCtx(dctx);
+ dctx = nullptr;
+ }
+
+DIAGNOSTIC_PUSH
+
std::unique_ptr<BufferedOutputStream>
createCompressor(
CompressionKind kind,