wgtmac commented on code in PR #1932:
URL: https://github.com/apache/orc/pull/1932#discussion_r1604256957
##########
c++/src/Compression.cc:
##########
@@ -1203,8 +1237,8 @@ namespace orc {
case CompressionKind_ZLIB: {
int level =
(strategy == CompressionStrategy_SPEED) ? Z_BEST_SPEED + 1 :
Z_DEFAULT_COMPRESSION;
- return std::make_unique<ZlibCompressionStream>(outStream, level,
bufferCapacity,
- compressionBlockSize,
pool, metrics);
+ return std::make_unique<ZlibCompressionStream>(
+ outStream, level, bufferCapacity, compressionBlockSize,
memoryBlockSize, pool, metrics);
Review Comment:
If `memoryBlockSize` is only used for streaming compressor, we should rename
it to be clear. However, I do think we need to support block compressor as well
as ZSTD (block-based) is the default one. This can be a separate PR though.
##########
c++/src/Compression.hh:
##########
@@ -45,12 +45,10 @@ namespace orc {
* @param compressionBlockSize compression buffer block size
* @param pool the memory pool
*/
- std::unique_ptr<BufferedOutputStream> createCompressor(CompressionKind kind,
- OutputStream*
outStream,
- CompressionStrategy
strategy,
- uint64_t
bufferCapacity,
- uint64_t
compressionBlockSize,
- MemoryPool& pool,
WriterMetrics* metrics);
+ std::unique_ptr<BufferedOutputStream> createCompressor(
+ CompressionKind kind, OutputStream* outStream, CompressionStrategy
strategy,
+ uint64_t bufferCapacity, uint64_t compressionBlockSize, uint64_t
memoryBlockSize,
Review Comment:
Please update the comment above to be in sync.
##########
c++/src/Compression.cc:
##########
@@ -155,20 +162,27 @@ namespace orc {
class CompressionStream : public CompressionStreamBase {
public:
CompressionStream(OutputStream* outStream, int compressionLevel, uint64_t
capacity,
- uint64_t blockSize, MemoryPool& pool, WriterMetrics*
metrics);
+ uint64_t compressionBlockSize, uint64_t memoryBlockSize,
MemoryPool& pool,
+ WriterMetrics* metrics);
virtual bool Next(void** data, int* size) override;
virtual std::string getName() const override = 0;
virtual void BackUp(int count) override;
virtual void suppress() override;
virtual uint64_t flush() override;
+ uint64_t getRawInputBufferSize() const override {
+ return rawInputBuffer.size();
+ }
protected:
// return total compressed size
virtual uint64_t doStreamingCompression() = 0;
// Buffer to hold uncompressed data until user calls Next()
BlockBuffer rawInputBuffer;
+
+ // compress with raw fallback
Review Comment:
The comment seems unnecessary
##########
c++/src/Compression.hh:
##########
@@ -45,12 +45,10 @@ namespace orc {
* @param compressionBlockSize compression buffer block size
* @param pool the memory pool
*/
- std::unique_ptr<BufferedOutputStream> createCompressor(CompressionKind kind,
- OutputStream*
outStream,
- CompressionStrategy
strategy,
- uint64_t
bufferCapacity,
- uint64_t
compressionBlockSize,
- MemoryPool& pool,
WriterMetrics* metrics);
+ std::unique_ptr<BufferedOutputStream> createCompressor(
+ CompressionKind kind, OutputStream* outStream, CompressionStrategy
strategy,
+ uint64_t bufferCapacity, uint64_t compressionBlockSize, uint64_t
memoryBlockSize,
Review Comment:
Now it seems that we have different concepts mixed here. It would be helpful
to explain what are the distinctions among these buffers and blocks.
##########
c++/src/Compression.cc:
##########
@@ -232,6 +244,22 @@ namespace orc {
}
rawInputBuffer.resize(0);
}
+ }
+
+ bool CompressionStream::Next(void** data, int* size) {
+ if (rawInputBuffer.size() > compressionBlockSize) {
+ std::stringstream ss;
+ ss << "uncompressed data size " << rawInputBuffer.size() << " is larger
than block size "
+ << compressionBlockSize
+ << ". compressionBlockSize should be set equal to multiply of "
+ "memoryBlockSize";
+ throw std::logic_error(ss.str());
+ }
+
+ // triggle compress when rawInputBuffer is reach the capacity
Review Comment:
```suggestion
// compress data in the rawInputBuffer when it is full
```
##########
c++/src/Compression.cc:
##########
@@ -200,14 +210,16 @@ namespace orc {
}
CompressionStream::CompressionStream(OutputStream* outStream, int
compressionLevel,
- uint64_t capacity, uint64_t blockSize,
MemoryPool& pool,
+ uint64_t capacity, uint64_t
compressionBlockSize,
+ uint64_t memoryBlockSize, MemoryPool&
pool,
WriterMetrics* metrics)
- : CompressionStreamBase(outStream, compressionLevel, capacity,
blockSize, pool, metrics),
- rawInputBuffer(pool, blockSize) {
+ : CompressionStreamBase(outStream, compressionLevel, capacity,
compressionBlockSize,
+ memoryBlockSize, pool, metrics),
+ rawInputBuffer(pool, memoryBlockSize) {
// PASS
}
- bool CompressionStream::Next(void** data, int* size) {
+ void CompressionStream::compressWithRawFallback() {
Review Comment:
What does raw fallback mean? Should we just name it `compressInternal`?
##########
c++/src/Compression.cc:
##########
@@ -232,6 +244,22 @@ namespace orc {
}
rawInputBuffer.resize(0);
}
+ }
+
+ bool CompressionStream::Next(void** data, int* size) {
+ if (rawInputBuffer.size() > compressionBlockSize) {
+ std::stringstream ss;
+ ss << "uncompressed data size " << rawInputBuffer.size() << " is larger
than block size "
+ << compressionBlockSize
+ << ". compressionBlockSize should be set equal to multiply of "
Review Comment:
```suggestion
<< ". compressionBlockSize should be set equal to multiple of "
```
##########
c++/include/orc/Writer.hh:
##########
@@ -277,6 +277,17 @@ namespace orc {
* @return if not set, return default value which is 1 MB.
*/
uint64_t getOutputBufferCapacity() const;
+
+ /**
+ * Set the initial block size of input buffer in the class
CompressionStream.
Review Comment:
Could we add more comment to let readers know what's the difference with
`setOutputBufferCapacity` above? Perhaps we also need to modify the comment in
setOutputBufferCapacity. Users rely on these public methods to tune their use
cases.
##########
c++/src/Compression.cc:
##########
@@ -232,6 +244,22 @@ namespace orc {
}
rawInputBuffer.resize(0);
}
+ }
+
+ bool CompressionStream::Next(void** data, int* size) {
+ if (rawInputBuffer.size() > compressionBlockSize) {
+ std::stringstream ss;
+ ss << "uncompressed data size " << rawInputBuffer.size() << " is larger
than block size "
+ << compressionBlockSize
+ << ". compressionBlockSize should be set equal to multiply of "
+ "memoryBlockSize";
+ throw std::logic_error(ss.str());
Review Comment:
Should we add a new Exception type?
##########
c++/src/Compression.cc:
##########
@@ -232,6 +244,22 @@ namespace orc {
}
rawInputBuffer.resize(0);
}
+ }
+
+ bool CompressionStream::Next(void** data, int* size) {
+ if (rawInputBuffer.size() > compressionBlockSize) {
+ std::stringstream ss;
+ ss << "uncompressed data size " << rawInputBuffer.size() << " is larger
than block size "
+ << compressionBlockSize
+ << ". compressionBlockSize should be set equal to multiply of "
Review Comment:
BTW, we should only put useful error message here. It seems the error
happens when the size is greater than the threshold and we should not say
anything about the multiple thing. That is something we need to check when
creating the compression stream.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]