[
https://issues.apache.org/jira/browse/ORC-192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16016520#comment-16016520
]
ASF GitHub Bot commented on ORC-192:
------------------------------------
Github user majetideepak commented on a diff in the pull request:
https://github.com/apache/orc/pull/122#discussion_r117355319
--- Diff: c++/src/Compression.cc ---
@@ -33,6 +33,254 @@
namespace orc {
+ class CompressionStreamBase: public BufferedOutputStream {
+ public:
+ CompressionStreamBase(OutputStream * outStream,
+ int compressionLevel,
+ uint64_t capacity,
+ uint64_t blockSize,
+ MemoryPool& pool);
+
+ virtual bool Next(void** data, int*size) override = 0;
+ virtual void BackUp(int count) override;
+
+ virtual std::string getName() const override = 0;
+ virtual uint64_t flush() override;
+
+ virtual bool isCompressed() const override { return true; }
+ virtual uint64_t getSize() const override;
+
+ protected:
+ void writeHeader(char * buffer, size_t compressedSize, bool original) {
+ buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1
: 0));
+ buffer[1] = static_cast<char>(compressedSize >> 7);
+ buffer[2] = static_cast<char>(compressedSize >> 15);
+ }
+
+ // Buffer to hold uncompressed data until user calls Next()
+ DataBuffer<unsigned char> rawInputBuffer;
+
+ // Compress level
+ int level;
+
+ // Compressed data output buffer
+ char * outputBuffer;
+
+ // Size for compressionBuffer
+ int bufferSize;
+
+ // Compress output position
+ int outputPosition;
+
+ // Compress output buffer size
+ int outputSize;
+ };
+
+ CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
+ int compressionLevel,
+ uint64_t capacity,
+ uint64_t blockSize,
+ MemoryPool& pool) :
+ BufferedOutputStream(pool,
+
outStream,
+
capacity,
+
blockSize),
+ rawInputBuffer(pool,
blockSize),
+ level(compressionLevel),
+ outputBuffer(nullptr),
+ bufferSize(0),
+ outputPosition(0),
+ outputSize(0) {
+ // PASS
+ }
+
+ void CompressionStreamBase::BackUp(int count) {
+ if (count > bufferSize) {
+ throw std::logic_error("Can't backup that much!");
+ }
+ bufferSize -= count;
+ }
+
+ uint64_t CompressionStreamBase::flush() {
+ void * data;
+ int size;
+ if (!Next(&data, &size)) {
+ throw std::logic_error("Failed to flush compression buffer.");
+ }
+ BufferedOutputStream::BackUp(outputSize - outputPosition);
+ bufferSize = outputSize = outputPosition = 0;
+ return BufferedOutputStream::flush();
+ }
+
+ uint64_t CompressionStreamBase::getSize() const {
+ return BufferedOutputStream::getSize() -
+ static_cast<uint64_t>(outputSize - outputPosition);
+ }
+
+ /**
+ * Streaming compression base class
+ */
+ class CompressionStream: public CompressionStreamBase {
+ public:
+ CompressionStream(OutputStream * outStream,
+ int compressionLevel,
+ uint64_t capacity,
+ uint64_t blockSize,
+ MemoryPool& pool);
+
+ virtual bool Next(void** data, int*size) override;
+ virtual std::string getName() const override = 0;
+
+ protected:
+ // return total compressed size
+ virtual uint64_t doStreamingCompression() = 0;
+ };
+
+ CompressionStream::CompressionStream(OutputStream * outStream,
+ int compressionLevel,
+ uint64_t capacity,
+ uint64_t blockSize,
+ MemoryPool& pool) :
+ CompressionStreamBase(outStream,
+
compressionLevel,
+ capacity,
+ blockSize,
+ pool) {
+ // PASS
+ }
+
+ bool CompressionStream::Next(void** data, int*size) {
+ if (bufferSize != 0) {
+ // adjust 3 bytes for the compression header
+ if (outputPosition + 3 >= outputSize) {
+ int newPosition = outputPosition + 3 - outputSize;
+ if (!BufferedOutputStream::Next(
+ reinterpret_cast<void **>(&outputBuffer),
+ &outputSize)) {
+ throw std::logic_error(
+ "Failed to get next output buffer from output stream.");
+ }
+ outputPosition = newPosition;
+ } else {
+ outputPosition += 3;
+ }
+
+ uint64_t totalCompressedSize = doStreamingCompression();
+
+ char * header = outputBuffer + outputPosition - totalCompressedSize
- 3;
+ if (totalCompressedSize >= static_cast<unsigned long>(bufferSize)) {
+ writeHeader(header, static_cast<size_t>(bufferSize), true);
+ memcpy(
+ header + 3,
+ rawInputBuffer.data(),
+ static_cast<size_t>(bufferSize));
+
+ int backup = static_cast<int>(totalCompressedSize) - bufferSize;
+ BufferedOutputStream::BackUp(backup);
+ outputPosition -= backup;
+ outputSize -= backup;
+ } else {
+ writeHeader(header, totalCompressedSize, false);
+ }
+ }
+
+ *data = rawInputBuffer.data();
+ *size = static_cast<int>(rawInputBuffer.size());
+ bufferSize = *size;
+
+ return true;
+ }
+
+ class ZlibCompressionStream: public CompressionStream {
+ public:
+ ZlibCompressionStream(OutputStream * outStream,
+ int compressionLevel,
+ uint64_t capacity,
+ uint64_t blockSize,
+ MemoryPool& pool);
+
+ virtual std::string getName() const override;
+
+ protected:
+ virtual uint64_t doStreamingCompression() override;
+
+ private:
+ void init();
+ z_stream strm;
+ };
+
+ ZlibCompressionStream::ZlibCompressionStream(
+ OutputStream * outStream,
+ int compressionLevel,
+ uint64_t capacity,
+ uint64_t blockSize,
+ MemoryPool& pool)
+ : CompressionStream(outStream,
+ compressionLevel,
+ capacity,
+ blockSize,
+ pool) {
+ init();
+ }
+
+ uint64_t ZlibCompressionStream::doStreamingCompression() {
+ if (deflateReset(&strm) != Z_OK) {
+ throw std::logic_error("Failed to reset inflate.");
+ }
+
+ strm.avail_in = static_cast<unsigned int>(bufferSize);
+ strm.next_in = rawInputBuffer.data();
+
+ do {
+ if (outputPosition >= outputSize) {
+ if (!BufferedOutputStream::Next(
+ reinterpret_cast<void **>(&outputBuffer),
+ &outputSize)) {
+ throw std::logic_error(
+ "Failed to get next output buffer from output stream.");
+ }
+ outputPosition = 0;
+ }
+ strm.next_out = reinterpret_cast<unsigned char *>
+ (outputBuffer + outputPosition);
+ strm.avail_out = static_cast<unsigned int>
+ (outputSize - outputPosition);
+
+ int ret = deflate(&strm, Z_FINISH);
+ outputPosition = outputSize - static_cast<int>(strm.avail_out);
+
+ if (ret == Z_STREAM_END) {
+ break;
+ } else if (ret == Z_OK) {
+ // needs more buffer so will continue the loop
+ } else {
+ throw std::logic_error("Failed to deflate input data.");
+ }
+ } while (strm.avail_out == 0);
+
+ return strm.total_out;
+ }
+
+ std::string ZlibCompressionStream::getName() const {
+ return "ZlibCompressionStream";
+ }
+
+DIAGNOSTIC_PUSH
+DIAGNOSTIC_IGNORE("-Wold-style-cast")
+
+ void ZlibCompressionStream::init() {
+ strm.zalloc = Z_NULL;
+ strm.zfree = Z_NULL;
+ strm.opaque = Z_NULL;
+
+ if (deflateInit2(&strm, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY)
+ != Z_OK) {
+ throw std::logic_error("Error while calling deflateInit2() for
zlib.");
--- End diff --
use `ParseError` here.
> Zlib compression stream
> -----------------------
>
> Key: ORC-192
> URL: https://issues.apache.org/jira/browse/ORC-192
> Project: ORC
> Issue Type: Sub-task
> Components: C++
> Reporter: Xiening Dai
> Assignee: Xiening Dai
>
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)