This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new 63166a8  ORC-639: [C++] Improve zstd compression performance
63166a8 is described below

commit 63166a877d4c2e7bd19a88897450e72e5d1498c3
Author: Ion GaztaƱaga <[email protected]>
AuthorDate: Wed Jun 10 06:14:11 2020 +0200

    ORC-639: [C++] Improve zstd compression performance
    
    Zstd's "Simple API" is used which initializes the compression/decompression 
context each time. This includes time to initialize all 
compression/decompression tables. Using the "Explicit context" API improves the 
performance significantly as compression/decompression tables are constructed 
once and reused each time "decompress" or "doBlockCompression" is called. We've 
noticed more than 15% time improvement in some applications with this change, 
so it seems that any ORC user using Zstd  [...]
    
    This fixes #511
---
 c++/src/Compression.cc | 86 +++++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 75 insertions(+), 11 deletions(-)

diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index 4f55821..c36eaeb 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -911,12 +911,16 @@ DIAGNOSTIC_POP
                                                    capacity,
                                                    blockSize,
                                                    pool) {
-      // PASS
+      this->init();
     }
 
     virtual std::string getName() const override {
       return "ZstdCompressionStream";
     }
+    
+    virtual ~ZSTDCompressionStream() override {
+      this->end();
+    }
 
   protected:
     virtual uint64_t doBlockCompression() override;
@@ -924,15 +928,43 @@ DIAGNOSTIC_POP
     virtual uint64_t estimateMaxCompressionSize() override {
       return ZSTD_compressBound(static_cast<size_t>(bufferSize));
     }
+    
+  private:
+    void init();
+    void end();
+    ZSTD_CCtx *cctx;
   };
 
   uint64_t ZSTDCompressionStream::doBlockCompression() {
-    return ZSTD_compress(compressorBuffer.data(),
-                         compressorBuffer.size(),
-                         rawInputBuffer.data(),
-                         static_cast<size_t>(bufferSize),
-                         level);
+    return ZSTD_compressCCtx(cctx,
+                             compressorBuffer.data(),
+                             compressorBuffer.size(),
+                             rawInputBuffer.data(),
+                             static_cast<size_t>(bufferSize),
+                             level);
   }
+  
+DIAGNOSTIC_PUSH
+
+#if defined(__GNUC__) || defined(__clang__)
+  DIAGNOSTIC_IGNORE("-Wold-style-cast")
+#endif
+
+  void ZSTDCompressionStream::init() {
+
+    cctx = ZSTD_createCCtx();
+    if (!cctx) {
+      throw std::runtime_error("Error while calling ZSTD_createCCtx() for 
zstd.");
+    }
+  }
+
+
+  void ZSTDCompressionStream::end() {
+    (void)ZSTD_freeCCtx(cctx);
+    cctx = nullptr;
+  }
+
+DIAGNOSTIC_PUSH
 
   /**
    * ZSTD block decompression
@@ -945,7 +977,11 @@ DIAGNOSTIC_POP
                             : BlockDecompressionStream(std::move(inStream),
                                                        blockSize,
                                                        _pool) {
-      // PASS
+      this->init();
+    }
+
+    virtual ~ZSTDDecompressionStream() override {
+      this->end();
     }
 
     std::string getName() const override {
@@ -959,18 +995,46 @@ DIAGNOSTIC_POP
                                 uint64_t length,
                                 char *output,
                                 size_t maxOutputLength) override;
+
+  private:
+    void init();
+    void end();
+    ZSTD_DCtx *dctx;
   };
 
   uint64_t ZSTDDecompressionStream::decompress(const char *inputPtr,
                                                uint64_t length,
                                                char *output,
                                                size_t maxOutputLength) {
-    return static_cast<uint64_t>(ZSTD_decompress(output,
-                                                 maxOutputLength,
-                                                 inputPtr,
-                                                 length));
+    return static_cast<uint64_t>(ZSTD_decompressDCtx(dctx,
+                                                     output,
+                                                     maxOutputLength,
+                                                     inputPtr,
+                                                     length));
+  }
+
+DIAGNOSTIC_PUSH
+
+#if defined(__GNUC__) || defined(__clang__)
+  DIAGNOSTIC_IGNORE("-Wold-style-cast")
+#endif
+
+  void ZSTDDecompressionStream::init() {
+
+    dctx = ZSTD_createDCtx();
+    if (!dctx) {
+      throw std::runtime_error("Error while calling ZSTD_createDCtx() for 
zstd.");
+    }
   }
 
+
+  void ZSTDDecompressionStream::end() {
+    (void)ZSTD_freeDCtx(dctx);
+    dctx = nullptr;
+  }
+
+DIAGNOSTIC_PUSH
+
   std::unique_ptr<BufferedOutputStream>
      createCompressor(
                       CompressionKind kind,

Reply via email to