Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117354764
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 
: 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     
outStream,
    +                                                                     
capacity,
    +                                                                     
blockSize),
    +                                                rawInputBuffer(pool, 
blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    +    }
    +    BufferedOutputStream::BackUp(outputSize - outputPosition);
    +    bufferSize = outputSize = outputPosition = 0;
    +    return BufferedOutputStream::flush();
    +  }
    +
    +  uint64_t CompressionStreamBase::getSize() const {
    +    return BufferedOutputStream::getSize() -
    +           static_cast<uint64_t>(outputSize - outputPosition);
    +  }
    +
    +  /**
    +   * Streaming compression base class
    +   */
    +  class CompressionStream: public CompressionStreamBase {
    +  public:
    +    CompressionStream(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override;
    +    virtual std::string getName() const override = 0;
    +
    +  protected:
    +    // return total compressed size
    +    virtual uint64_t doStreamingCompression() = 0;
    +  };
    +
    +  CompressionStream::CompressionStream(OutputStream * outStream,
    +                                       int compressionLevel,
    +                                       uint64_t capacity,
    +                                       uint64_t blockSize,
    +                                       MemoryPool& pool) :
    +                                         CompressionStreamBase(outStream,
    +                                                               
compressionLevel,
    +                                                               capacity,
    +                                                               blockSize,
    +                                                               pool) {
    +    // PASS
    +  }
    +
    +  bool CompressionStream::Next(void** data, int*size) {
    +    if (bufferSize != 0) {
    +      // adjust 3 bytes for the compression header
    +      if (outputPosition + 3 >= outputSize) {
    +        int newPosition = outputPosition + 3 - outputSize;
    +        if (!BufferedOutputStream::Next(
    +          reinterpret_cast<void **>(&outputBuffer),
    +          &outputSize)) {
    +          throw std::logic_error(
    +            "Failed to get next output buffer from output stream.");
    +        }
    +        outputPosition = newPosition;
    +      } else {
    +        outputPosition += 3;
    +      }
    +
    +      uint64_t totalCompressedSize = doStreamingCompression();
    +
    +      char * header = outputBuffer + outputPosition - totalCompressedSize 
- 3;
    +      if (totalCompressedSize >= static_cast<unsigned long>(bufferSize)) {
    +        writeHeader(header, static_cast<size_t>(bufferSize), true);
    +        memcpy(
    +          header + 3,
    +          rawInputBuffer.data(),
    +          static_cast<size_t>(bufferSize));
    +
    +        int backup = static_cast<int>(totalCompressedSize) - bufferSize;
    +        BufferedOutputStream::BackUp(backup);
    +        outputPosition -= backup;
    +        outputSize -= backup;
    +      } else {
    +        writeHeader(header, totalCompressedSize, false);
    +      }
    +    }
    +
    +    *data = rawInputBuffer.data();
    +    *size = static_cast<int>(rawInputBuffer.size());
    +    bufferSize = *size;
    +
    +    return true;
    +  }
    +
    +  class ZlibCompressionStream: public CompressionStream {
    +  public:
    +    ZlibCompressionStream(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual std::string getName() const override;
    +
    +  protected:
    +    virtual uint64_t doStreamingCompression() override;
    +
    +  private:
    +    void init();
    +    z_stream strm;
    +  };
    +
    +  ZlibCompressionStream::ZlibCompressionStream(
    +                        OutputStream * outStream,
    +                        int compressionLevel,
    +                        uint64_t capacity,
    +                        uint64_t blockSize,
    +                        MemoryPool& pool)
    +                        : CompressionStream(outStream,
    +                                            compressionLevel,
    +                                            capacity,
    +                                            blockSize,
    +                                            pool) {
    +    init();
    +  }
    +
    +  uint64_t ZlibCompressionStream::doStreamingCompression() {
    +    if (deflateReset(&strm) != Z_OK) {
    +      throw std::logic_error("Failed to reset inflate.");
    --- End diff --
    
    The decompression logic uses `ParseError` for failed library calls.
    We should follow the same here. `ParseError(ZlibCompressionStream failed to 
reset deflate)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to