[ 
https://issues.apache.org/jira/browse/ORC-192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16016517#comment-16016517
 ] 

ASF GitHub Bot commented on ORC-192:
------------------------------------

Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117355282
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 
: 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     
outStream,
    +                                                                     
capacity,
    +                                                                     
blockSize),
    +                                                rawInputBuffer(pool, 
blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    +    }
    +    BufferedOutputStream::BackUp(outputSize - outputPosition);
    +    bufferSize = outputSize = outputPosition = 0;
    +    return BufferedOutputStream::flush();
    +  }
    +
    +  uint64_t CompressionStreamBase::getSize() const {
    +    return BufferedOutputStream::getSize() -
    +           static_cast<uint64_t>(outputSize - outputPosition);
    +  }
    +
    +  /**
    +   * Streaming compression base class
    +   */
    +  class CompressionStream: public CompressionStreamBase {
    +  public:
    +    CompressionStream(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override;
    +    virtual std::string getName() const override = 0;
    +
    +  protected:
    +    // return total compressed size
    +    virtual uint64_t doStreamingCompression() = 0;
    +  };
    +
    +  CompressionStream::CompressionStream(OutputStream * outStream,
    +                                       int compressionLevel,
    +                                       uint64_t capacity,
    +                                       uint64_t blockSize,
    +                                       MemoryPool& pool) :
    +                                         CompressionStreamBase(outStream,
    +                                                               
compressionLevel,
    +                                                               capacity,
    +                                                               blockSize,
    +                                                               pool) {
    +    // PASS
    +  }
    +
    +  bool CompressionStream::Next(void** data, int*size) {
    +    if (bufferSize != 0) {
    +      // adjust 3 bytes for the compression header
    +      if (outputPosition + 3 >= outputSize) {
    +        int newPosition = outputPosition + 3 - outputSize;
    +        if (!BufferedOutputStream::Next(
    +          reinterpret_cast<void **>(&outputBuffer),
    +          &outputSize)) {
    +          throw std::logic_error(
    +            "Failed to get next output buffer from output stream.");
    +        }
    +        outputPosition = newPosition;
    +      } else {
    +        outputPosition += 3;
    +      }
    +
    +      uint64_t totalCompressedSize = doStreamingCompression();
    +
    +      char * header = outputBuffer + outputPosition - totalCompressedSize 
- 3;
    +      if (totalCompressedSize >= static_cast<unsigned long>(bufferSize)) {
    +        writeHeader(header, static_cast<size_t>(bufferSize), true);
    +        memcpy(
    +          header + 3,
    +          rawInputBuffer.data(),
    +          static_cast<size_t>(bufferSize));
    +
    +        int backup = static_cast<int>(totalCompressedSize) - bufferSize;
    +        BufferedOutputStream::BackUp(backup);
    +        outputPosition -= backup;
    +        outputSize -= backup;
    +      } else {
    +        writeHeader(header, totalCompressedSize, false);
    +      }
    +    }
    +
    +    *data = rawInputBuffer.data();
    +    *size = static_cast<int>(rawInputBuffer.size());
    +    bufferSize = *size;
    +
    +    return true;
    +  }
    +
    +  class ZlibCompressionStream: public CompressionStream {
    +  public:
    +    ZlibCompressionStream(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual std::string getName() const override;
    +
    +  protected:
    +    virtual uint64_t doStreamingCompression() override;
    +
    +  private:
    +    void init();
    +    z_stream strm;
    +  };
    +
    +  ZlibCompressionStream::ZlibCompressionStream(
    +                        OutputStream * outStream,
    +                        int compressionLevel,
    +                        uint64_t capacity,
    +                        uint64_t blockSize,
    +                        MemoryPool& pool)
    +                        : CompressionStream(outStream,
    +                                            compressionLevel,
    +                                            capacity,
    +                                            blockSize,
    +                                            pool) {
    +    init();
    +  }
    +
    +  uint64_t ZlibCompressionStream::doStreamingCompression() {
    +    if (deflateReset(&strm) != Z_OK) {
    +      throw std::logic_error("Failed to reset inflate.");
    +    }
    +
    +    strm.avail_in = static_cast<unsigned int>(bufferSize);
    +    strm.next_in = rawInputBuffer.data();
    +
    +    do {
    +      if (outputPosition >= outputSize) {
    +        if (!BufferedOutputStream::Next(
    +          reinterpret_cast<void **>(&outputBuffer),
    +          &outputSize)) {
    +          throw std::logic_error(
    +            "Failed to get next output buffer from output stream.");
    +        }
    +        outputPosition = 0;
    +      }
    +      strm.next_out = reinterpret_cast<unsigned char *>
    +      (outputBuffer + outputPosition);
    +      strm.avail_out = static_cast<unsigned int>
    +      (outputSize - outputPosition);
    +
    +      int ret = deflate(&strm, Z_FINISH);
    +      outputPosition = outputSize - static_cast<int>(strm.avail_out);
    +
    +      if (ret == Z_STREAM_END) {
    +        break;
    +      } else if (ret == Z_OK) {
    +        // needs more buffer so will continue the loop
    +      } else {
    +        throw std::logic_error("Failed to deflate input data.");
    --- End diff --
    
    use `ParseError` here.


> Zlib compression stream
> -----------------------
>
>                 Key: ORC-192
>                 URL: https://issues.apache.org/jira/browse/ORC-192
>             Project: ORC
>          Issue Type: Sub-task
>          Components: C++
>            Reporter: Xiening Dai
>            Assignee: Xiening Dai
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to