ffacs commented on code in PR #1916:
URL: https://github.com/apache/orc/pull/1916#discussion_r1582236120
##########
c++/src/Compression.cc:
##########
@@ -187,45 +161,81 @@ namespace orc {
virtual bool Next(void** data, int* size) override;
virtual std::string getName() const override = 0;
+ virtual void BackUp(int count) override;
+ virtual void suppress() override;
+ virtual uint64_t flush() override;
protected:
// return total compressed size
virtual uint64_t doStreamingCompression() = 0;
+
+ // Buffer to hold uncompressed data until user calls Next()
+ BlockBuffer rawInputBuffer;
};
+ void CompressionStream::BackUp(int count) {
+ uint64_t backup = static_cast<uint64_t>(count);
+ uint64_t currSize = rawInputBuffer.size();
+ if (backup > currSize) {
+ throw std::logic_error("Can't backup that much!");
+ }
+ rawInputBuffer.resize(currSize - backup);
+ }
+
+ uint64_t CompressionStream::flush() {
+ void* data;
+ int size;
+ if (!Next(&data, &size)) {
+ throw std::runtime_error("Failed to flush compression buffer.");
+ }
+ BufferedOutputStream::BackUp(outputSize - outputPosition);
+ rawInputBuffer.resize(0);
+ outputSize = outputPosition = 0;
+ return BufferedOutputStream::flush();
+ }
+
+ void CompressionStream::suppress() {
+ outputBuffer = nullptr;
+ outputPosition = outputSize = 0;
+ rawInputBuffer.resize(0);
+ BufferedOutputStream::suppress();
+ }
+
CompressionStream::CompressionStream(OutputStream* outStream, int
compressionLevel,
uint64_t capacity, uint64_t blockSize,
MemoryPool& pool,
WriterMetrics* metrics)
- : CompressionStreamBase(outStream, compressionLevel, capacity,
blockSize, pool, metrics) {
+ : CompressionStreamBase(outStream, compressionLevel, capacity,
blockSize, pool, metrics),
+ rawInputBuffer(pool, blockSize) {
// PASS
}
bool CompressionStream::Next(void** data, int* size) {
- if (bufferSize != 0) {
+ if (rawInputBuffer.size() != 0) {
Review Comment:
Maybe we can leverage the flexibility of BlockBuffer Here. That is, we can
do compress only when the size of rawInputBuffer reachs compression block
buffer size.
##########
c++/src/BlockBuffer.hh:
##########
@@ -106,18 +106,39 @@ namespace orc {
}
void resize(uint64_t size);
+
+ uint64_t getBlockSize(uint64_t blockId) const;
+
/**
* Requests the BlockBuffer to contain at least newCapacity bytes.
* Reallocation happens if there is need of more space.
* @param newCapacity new capacity of BlockBuffer
*/
void reserve(uint64_t newCapacity);
+
/**
* Write the BlockBuffer content into OutputStream
* @param output the output stream to write to
* @param metrics the metrics of the writer
*/
void writeTo(OutputStream* output, WriterMetrics* metrics);
+
+ /**
+ * Get the block data by block index
+ * @param blockIndex the index of blocks
+ * @param data the pointer to the block data
+ * @param size the size of the block data
+ * @return true if the block data is successfully retrieved
+ */
+ bool getBlockData(uint64_t blockIndex, void** data, uint64_t* size) const;
// add ut
+
+ /**
+ * Get next availiable memory block
+ * @param data the pointer to the block data
+ * @param size the size of the block data
+ * @return true if the block data is successfully retrieved
+ */
+ bool requestBuffer(void** buffer, int* size); // add ut
Review Comment:
ditto
##########
c++/src/Compression.cc:
##########
@@ -260,31 +270,53 @@ namespace orc {
throw std::runtime_error("Failed to reset inflate.");
}
- strm_.avail_in = static_cast<unsigned int>(bufferSize);
- strm_.next_in = rawInputBuffer.data();
+ // reset output buffer
+ if (outputPosition != 0) {
+ BufferedOutputStream::BackUp(outputSize - outputPosition);
Review Comment:
Sorry but I don't get the intention we reset output buffer here.
##########
c++/src/BlockBuffer.hh:
##########
@@ -106,18 +106,39 @@ namespace orc {
}
void resize(uint64_t size);
+
+ uint64_t getBlockSize(uint64_t blockId) const;
+
/**
* Requests the BlockBuffer to contain at least newCapacity bytes.
* Reallocation happens if there is need of more space.
* @param newCapacity new capacity of BlockBuffer
*/
void reserve(uint64_t newCapacity);
+
/**
* Write the BlockBuffer content into OutputStream
* @param output the output stream to write to
* @param metrics the metrics of the writer
*/
void writeTo(OutputStream* output, WriterMetrics* metrics);
+
+ /**
+ * Get the block data by block index
+ * @param blockIndex the index of blocks
+ * @param data the pointer to the block data
+ * @param size the size of the block data
+ * @return true if the block data is successfully retrieved
+ */
+ bool getBlockData(uint64_t blockIndex, void** data, uint64_t* size) const;
// add ut
Review Comment:
Please remove the comment at the end of the line.
##########
c++/src/Compression.cc:
##########
@@ -161,6 +133,8 @@ namespace orc {
offset += currentSize;
outputPosition += currentSize;
}
+ BufferedOutputStream::BackUp(outputSize - outputPosition);
Review Comment:
Why should we do this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]