Repository: incubator-impala Updated Branches: refs/heads/master 304edb28c -> f3d8ccdf0
IMPALA-5532: Stack-allocate compressors in RowBatch (de)serialization Change allocation pattern for Codec objects in RowBatch to be stack-allocated. Make c'tors and Init() methods of codec implementations publicly visible in order to do so. Fix bit-rotting bug in row-batch-serialize-benchmark that made it abort on start up. Change-Id: I6641f4a08bd2711c4f4515ab29a6e5418cbd5f51 Reviewed-on: http://gerrit.cloudera.org:8080/7478 Reviewed-by: Henry Robinson <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f3d8ccdf Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f3d8ccdf Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f3d8ccdf Branch: refs/heads/master Commit: f3d8ccdf0f19b0b4077df517cf604a863c55bb37 Parents: 304edb2 Author: Henry Robinson <[email protected]> Authored: Sun Jun 18 22:19:05 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Mon Jul 24 09:38:24 2017 +0000 ---------------------------------------------------------------------- .../benchmarks/row-batch-serialize-benchmark.cc | 29 +++--- be/src/experiments/compression-test.cc | 4 +- be/src/runtime/row-batch.cc | 29 +++--- be/src/util/codec.cc | 27 +++--- be/src/util/codec.h | 40 +++++---- be/src/util/compress.cc | 33 ++++--- be/src/util/compress.h | 90 +++++++++---------- be/src/util/decompress-test.cc | 3 +- be/src/util/decompress.cc | 90 ++++++++++--------- be/src/util/decompress.h | 93 +++++++++++--------- be/src/util/runtime-profile.cc | 1 + 11 files changed, 223 insertions(+), 216 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/benchmarks/row-batch-serialize-benchmark.cc ---------------------------------------------------------------------- diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc b/be/src/benchmarks/row-batch-serialize-benchmark.cc index 0099260..350252d 100644 --- a/be/src/benchmarks/row-batch-serialize-benchmark.cc +++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc @@ -23,6 +23,7 @@ #include "runtime/mem-tracker.h" #include "runtime/raw-value.h" #include "runtime/row-batch.h" +#include "runtime/string-value.h" #include "runtime/tuple-row.h" #include "service/fe-support.h" #include "service/frontend.h" @@ -31,6 +32,7 @@ #include "util/compress.h" #include "util/cpu-info.h" #include "util/decompress.h" +#include "util/scope-exit-trigger.h" #include "common/names.h" @@ -115,18 +117,21 @@ class RowBatchSerializeBaseline { if (size > 0) { // Try compressing tuple_data to compression_scratch_, swap if compressed data is // smaller - scoped_ptr<Codec> compressor; - Status status = Codec::CreateCompressor(NULL, false, THdfsCompression::LZ4, - &compressor); + Lz4Compressor compressor(nullptr, false); + Status status = compressor.Init(); DCHECK(status.ok()) << status.GetDetail(); + auto compressor_cleanup = + MakeScopeExitTrigger([&compressor]() { compressor.Close(); }); - int64_t compressed_size = compressor->MaxOutputLen(size); + int64_t compressed_size = compressor.MaxOutputLen(size); if (batch->compression_scratch_.size() < compressed_size) { batch->compression_scratch_.resize(compressed_size); } uint8_t* input = (uint8_t*)output_batch->tuple_data.c_str(); uint8_t* compressed_output = (uint8_t*)batch->compression_scratch_.c_str(); - compressor->ProcessBlock(true, size, input, &compressed_size, &compressed_output); + status = + compressor.ProcessBlock(true, size, input, &compressed_size, &compressed_output); + DCHECK(status.ok()) << status.GetDetail(); if (LIKELY(compressed_size < size)) { batch->compression_scratch_.resize(compressed_size); output_batch->tuple_data.swap(batch->compression_scratch_); @@ -193,18 +198,18 @@ class RowBatchSerializeBaseline { uint8_t* compressed_data = (uint8_t*)input_batch.tuple_data.c_str(); size_t compressed_size = input_batch.tuple_data.size(); - scoped_ptr<Codec> decompressor; - Status status = Codec::CreateDecompressor(NULL, false, input_batch.compression_type, - &decompressor); + Lz4Decompressor decompressor(nullptr, false); + Status status = decompressor.Init(); DCHECK(status.ok()) << status.GetDetail(); + auto compressor_cleanup = + MakeScopeExitTrigger([&decompressor]() { decompressor.Close(); }); int64_t uncompressed_size = input_batch.uncompressed_size; DCHECK_NE(uncompressed_size, -1) << "RowBatch decompression failed"; tuple_data = batch->tuple_data_pool()->Allocate(uncompressed_size); - status = decompressor->ProcessBlock(true, compressed_size, compressed_data, - &uncompressed_size, &tuple_data); + status = decompressor.ProcessBlock( + true, compressed_size, compressed_data, &uncompressed_size, &tuple_data); DCHECK(status.ok()) << "RowBatch decompression failed."; - decompressor->Close(); } else { // Tuple data uncompressed, copy directly into data pool tuple_data = batch->tuple_data_pool()->Allocate(input_batch.tuple_data.size()); @@ -321,8 +326,6 @@ class RowBatchSerializeBenchmark { } static void Run() { - CpuInfo::Init(); - MemTracker tracker; MemPool mem_pool(&tracker); ObjectPool obj_pool; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/experiments/compression-test.cc ---------------------------------------------------------------------- diff --git a/be/src/experiments/compression-test.cc b/be/src/experiments/compression-test.cc index cedd30a..f6b87bb 100644 --- a/be/src/experiments/compression-test.cc +++ b/be/src/experiments/compression-test.cc @@ -69,7 +69,8 @@ void TestCompression(int num, int min_len, int max_len, THdfsCompression::type c } scoped_ptr<Codec> compressor; - Codec::CreateCompressor(NULL, false, codec, &compressor); + Status status = Codec::CreateCompressor(NULL, false, codec, &compressor); + DCHECK(status.ok()); int64_t compressed_len = compressor->MaxOutputLen(offset); uint8_t* compressed_buffer = (uint8_t*)malloc(compressed_len); @@ -102,4 +103,3 @@ int main(int argc, char **argv) { impala::TestCompression(1000000, 5, 15, impala::THdfsCompression::GZIP); return 0; } - http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/runtime/row-batch.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index 9bb96aa..fbe1b94 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -30,6 +30,7 @@ #include "util/debug-util.h" #include "util/decompress.h" #include "util/fixed-size-hash-table.h" +#include "util/scope-exit-trigger.h" #include "common/names.h" @@ -97,22 +98,24 @@ RowBatch::RowBatch( } uint8_t* tuple_data; if (input_batch.compression_type != THdfsCompression::NONE) { + DCHECK_EQ(THdfsCompression::LZ4, input_batch.compression_type) + << "Unexpected compression type: " << input_batch.compression_type; // Decompress tuple data into data pool uint8_t* compressed_data = (uint8_t*)input_batch.tuple_data.c_str(); size_t compressed_size = input_batch.tuple_data.size(); - scoped_ptr<Codec> decompressor; - Status status = Codec::CreateDecompressor(NULL, false, input_batch.compression_type, - &decompressor); + Lz4Decompressor decompressor(nullptr, false); + Status status = decompressor.Init(); DCHECK(status.ok()) << status.GetDetail(); + auto compressor_cleanup = + MakeScopeExitTrigger([&decompressor]() { decompressor.Close(); }); int64_t uncompressed_size = input_batch.uncompressed_size; DCHECK_NE(uncompressed_size, -1) << "RowBatch decompression failed"; tuple_data = tuple_data_pool_.Allocate(uncompressed_size); - status = decompressor->ProcessBlock(true, compressed_size, compressed_data, - &uncompressed_size, &tuple_data); + status = decompressor.ProcessBlock( + true, compressed_size, compressed_data, &uncompressed_size, &tuple_data); DCHECK(status.ok()) << "RowBatch decompression failed."; - decompressor->Close(); } else { // Tuple data uncompressed, copy directly into data pool tuple_data = tuple_data_pool_.Allocate(input_batch.tuple_data.size()); @@ -205,18 +208,20 @@ Status RowBatch::Serialize(TRowBatch* output_batch, bool full_dedup) { if (size > 0) { // Try compressing tuple_data to compression_scratch_, swap if compressed data is // smaller - scoped_ptr<Codec> compressor; - RETURN_IF_ERROR(Codec::CreateCompressor(NULL, false, THdfsCompression::LZ4, - &compressor)); + Lz4Compressor compressor(nullptr, false); + RETURN_IF_ERROR(compressor.Init()); + auto compressor_cleanup = + MakeScopeExitTrigger([&compressor]() { compressor.Close(); }); - int64_t compressed_size = compressor->MaxOutputLen(size); + int64_t compressed_size = compressor.MaxOutputLen(size); if (compression_scratch_.size() < compressed_size) { compression_scratch_.resize(compressed_size); } uint8_t* input = (uint8_t*)output_batch->tuple_data.c_str(); uint8_t* compressed_output = (uint8_t*)compression_scratch_.c_str(); - RETURN_IF_ERROR(compressor->ProcessBlock(true, size, input, &compressed_size, - &compressed_output)); + RETURN_IF_ERROR( + compressor.ProcessBlock(true, size, input, &compressed_size, &compressed_output)); + if (LIKELY(compressed_size < size)) { compression_scratch_.resize(compressed_size); output_batch->tuple_data.swap(compression_scratch_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/codec.cc ---------------------------------------------------------------------- diff --git a/be/src/util/codec.cc b/be/src/util/codec.cc index 8c5cc90..c76fa44 100644 --- a/be/src/util/codec.cc +++ b/be/src/util/codec.cc @@ -17,19 +17,15 @@ #include "util/codec.h" -#include <boost/assign/list_of.hpp> -#include <limits> // for std::numeric_limits #include <gutil/strings/substitute.h> #include "util/compress.h" #include "util/decompress.h" -#include "gen-cpp/CatalogObjects_types.h" #include "gen-cpp/CatalogObjects_constants.h" #include "common/names.h" -using boost::assign::map_list_of; using namespace impala; using namespace strings; @@ -43,12 +39,11 @@ const char* const Codec::UNKNOWN_CODEC_ERROR = const char* const NO_LZO_MSG = "LZO codecs may not be created via the Codec interface. " "Instead the LZO library is directly invoked."; -const Codec::CodecMap Codec::CODEC_MAP = map_list_of - ("", THdfsCompression::NONE) - (DEFAULT_COMPRESSION, THdfsCompression::DEFAULT) - (GZIP_COMPRESSION, THdfsCompression::GZIP) - (BZIP2_COMPRESSION, THdfsCompression::BZIP2) - (SNAPPY_COMPRESSION, THdfsCompression::SNAPPY_BLOCKED); +const Codec::CodecMap Codec::CODEC_MAP = {{"", THdfsCompression::NONE}, + {DEFAULT_COMPRESSION, THdfsCompression::DEFAULT}, + {GZIP_COMPRESSION, THdfsCompression::GZIP}, + {BZIP2_COMPRESSION, THdfsCompression::BZIP2}, + {SNAPPY_COMPRESSION, THdfsCompression::SNAPPY_BLOCKED}}; string Codec::GetCodecName(THdfsCompression::type type) { for (const CodecMap::value_type& codec: g_CatalogObjects_constants.COMPRESSION_MAP) { @@ -85,7 +80,7 @@ Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse, THdfsCompression::type format, scoped_ptr<Codec>* compressor) { switch (format) { case THdfsCompression::NONE: - compressor->reset(NULL); + compressor->reset(nullptr); return Status::OK(); case THdfsCompression::GZIP: compressor->reset(new GzipCompressor(GzipCompressor::GZIP, mem_pool, reuse)); @@ -133,7 +128,7 @@ Status Codec::CreateDecompressor(MemPool* mem_pool, bool reuse, THdfsCompression::type format, scoped_ptr<Codec>* decompressor) { switch (format) { case THdfsCompression::NONE: - decompressor->reset(NULL); + decompressor->reset(nullptr); return Status::OK(); case THdfsCompression::DEFAULT: case THdfsCompression::GZIP: @@ -166,17 +161,15 @@ Status Codec::CreateDecompressor(MemPool* mem_pool, bool reuse, Codec::Codec(MemPool* mem_pool, bool reuse_buffer, bool supports_streaming) : memory_pool_(mem_pool), reuse_buffer_(reuse_buffer), - out_buffer_(NULL), - buffer_length_(0), supports_streaming_(supports_streaming) { - if (memory_pool_ != NULL) { + if (memory_pool_ != nullptr) { temp_memory_pool_.reset(new MemPool(memory_pool_->mem_tracker())); } } void Codec::Close() { - if (temp_memory_pool_.get() != NULL) { - DCHECK(memory_pool_ != NULL); + if (temp_memory_pool_.get() != nullptr) { + DCHECK(memory_pool_ != nullptr); memory_pool_->AcquireData(temp_memory_pool_.get(), false); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/codec.h ---------------------------------------------------------------------- diff --git a/be/src/util/codec.h b/be/src/util/codec.h index 910cf55..9475ec1 100644 --- a/be/src/util/codec.h +++ b/be/src/util/codec.h @@ -22,11 +22,10 @@ #include "common/status.h" #include <boost/scoped_ptr.hpp> -#include "gen-cpp/Descriptors_types.h" -namespace impala { +#include "runtime/mem-pool.h" -class MemPool; +namespace impala { /// Create a compression object. This is the base class for all compression algorithms. A /// compression algorithm is either a compressor or a decompressor. To add a new @@ -61,14 +60,15 @@ class Codec { /// format: the type of decompressor to create. /// Output: /// decompressor: scoped pointer to the decompressor class to use. - /// If mem_pool is NULL, then the resulting codec will never allocate memory and + /// If mem_pool is nullptr, then the resulting codec will never allocate memory and /// the caller must be responsible for it. static Status CreateDecompressor(MemPool* mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr<Codec>* decompressor); /// Alternate factory method: takes a codec string and populates a scoped pointer. static Status CreateDecompressor(MemPool* mem_pool, bool reuse, - const std::string& codec, boost::scoped_ptr<Codec>* decompressor); + const std::string& codec, + boost::scoped_ptr<Codec>* decompressor) WARN_UNUSED_RESULT; /// Create a compressor. /// Input: @@ -78,11 +78,12 @@ class Codec { /// Output: /// compressor: scoped pointer to the compressor class to use. static Status CreateCompressor(MemPool* mem_pool, bool reuse, - THdfsCompression::type format, boost::scoped_ptr<Codec>* compressor); + THdfsCompression::type format, + boost::scoped_ptr<Codec>* compressor) WARN_UNUSED_RESULT; /// Alternate factory method: takes a codec string and populates a scoped pointer. - static Status CreateCompressor(MemPool* mem_pool, bool reuse, - const std::string& codec, boost::scoped_ptr<Codec>* compressor); + static Status CreateCompressor(MemPool* mem_pool, bool reuse, const std::string& codec, + boost::scoped_ptr<Codec>* compressor) WARN_UNUSED_RESULT; /// Return the name of a compression algorithm. static std::string GetCodecName(THdfsCompression::type); @@ -91,6 +92,9 @@ class Codec { virtual ~Codec() {} + /// Initialize the codec. This should only be called once. + virtual Status Init() WARN_UNUSED_RESULT { return Status::OK(); } + /// Process a block of data, either compressing or decompressing it. // /// If output_preallocated is true, *output_length must be the length of *output and data @@ -111,7 +115,7 @@ class Codec { /// not int64_ts. We need to keep this interface because the Parquet thrift uses ints. /// See IMPALA-1116. Status ProcessBlock32(bool output_preallocated, int input_length, const uint8_t* input, - int* output_length, uint8_t** output); + int* output_length, uint8_t** output) WARN_UNUSED_RESULT; /// Process data like ProcessBlock(), but can consume partial input and may only produce /// partial output. *input_bytes_read returns the number of bytes of input that have @@ -132,7 +136,8 @@ class Codec { /// output: decompressed data /// stream_end: end of output buffer corresponds to the end of a compressed stream. virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t* input, - int64_t* input_bytes_read, int64_t* output_length, uint8_t** output, bool* stream_end) { + int64_t* input_bytes_read, int64_t* output_length, uint8_t** output, + bool* stream_end) WARN_UNUSED_RESULT { return Status("Not implemented."); } @@ -141,7 +146,7 @@ class Codec { /// a buffer. /// This must be an O(1) operation (i.e. cannot read all of input). Codecs that /// don't support this should return -1. - virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL) = 0; + virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = nullptr) = 0; /// Must be called on codec before destructor for final cleanup. virtual void Close(); @@ -156,16 +161,13 @@ class Codec { protected: /// Create a compression operator /// Inputs: - /// mem_pool: memory pool to allocate the output buffer. If mem_pool is NULL then the - /// caller must always preallocate *output in ProcessBlock(). + /// mem_pool: memory pool to allocate the output buffer. If mem_pool is nullptr then + /// the caller must always preallocate *output in ProcessBlock(). /// reuse_buffer: if false always allocate a new buffer rather than reuse. Codec(MemPool* mem_pool, bool reuse_buffer, bool supports_streaming = false); - /// Initialize the codec. This should only be called once. - virtual Status Init() = 0; - /// Pool to allocate the buffer to hold transformed data. - MemPool* memory_pool_; + MemPool* memory_pool_ = nullptr; /// Temporary memory pool: in case we get the output size too small we can use this to /// free unused buffers. @@ -176,10 +178,10 @@ class Codec { /// Buffer to hold transformed data. /// Either passed from the caller or allocated from memory_pool_. - uint8_t* out_buffer_; + uint8_t* out_buffer_ = nullptr; /// Length of the output buffer. - int64_t buffer_length_; + int64_t buffer_length_ = 0; /// Can decompressor support streaming mode. /// This is set to true for codecs that implement ProcessBlockStreaming(). http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/compress.cc ---------------------------------------------------------------------- diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc index efa39bf..90656aa 100644 --- a/be/src/util/compress.cc +++ b/be/src/util/compress.cc @@ -16,18 +16,17 @@ // under the License. #include "util/compress.h" -#include "exec/read-write-util.h" -#include "runtime/runtime-state.h" -// Codec libraries -#include <zlib.h> #include <bzlib.h> +#include <zlib.h> +#include <boost/crc.hpp> +#include <gutil/strings/substitute.h> #undef DISALLOW_COPY_AND_ASSIGN // Snappy redefines this. #include <snappy.h> #include <lz4.h> -#include <boost/crc.hpp> -#include <gutil/strings/substitute.h> +#include "exec/read-write-util.h" +#include "runtime/mem-pool.h" #include "common/names.h" @@ -95,8 +94,8 @@ Status GzipCompressor::Compress(int64_t input_length, const uint8_t* input, if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) { if (ret == Z_OK) { // will return Z_OK (and stream_.msg NOT set) if stream_.avail_out is too small - return Status(Substitute("zlib deflate failed: output buffer ($0) is too small.", - output_length).c_str()); + return Status(Substitute( + "zlib deflate failed: output buffer ($0) is too small.", output_length)); } stringstream ss; ss << "zlib deflate failed: " << stream_.msg; @@ -118,8 +117,8 @@ Status GzipCompressor::ProcessBlock(bool output_preallocated, DCHECK(!output_preallocated || (output_preallocated && *output_length > 0)); int64_t max_compressed_len = MaxOutputLen(input_length); if (!output_preallocated) { - if (!reuse_buffer_ || buffer_length_ < max_compressed_len || out_buffer_ == NULL) { - DCHECK(memory_pool_ != NULL) << "Can't allocate without passing in a mem pool"; + if (!reuse_buffer_ || buffer_length_ < max_compressed_len || out_buffer_ == nullptr) { + DCHECK(memory_pool_ != nullptr) << "Can't allocate without passing in a mem pool"; buffer_length_ = max_compressed_len; out_buffer_ = memory_pool_->Allocate(buffer_length_); } @@ -144,15 +143,15 @@ int64_t BzipCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) { Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t* input, int64_t *output_length, uint8_t** output) { - // The bz2 library does not allow input to be NULL, even when input_length is 0. This + // The bz2 library does not allow input to be nullptr, even when input_length is 0. This // should be OK because we do not write any file formats that support bzip compression. - DCHECK(input != NULL); + DCHECK(input != nullptr); DCHECK_GE(input_length, 0); if (output_preallocated) { buffer_length_ = *output_length; out_buffer_ = *output; - } else if (!reuse_buffer_ || out_buffer_ == NULL) { + } else if (!reuse_buffer_ || out_buffer_ == nullptr) { // guess that we will need no more the input length. buffer_length_ = input_length; out_buffer_ = temp_memory_pool_->Allocate(buffer_length_); @@ -161,7 +160,7 @@ Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_leng unsigned int outlen = static_cast<unsigned int>(buffer_length_); int ret = BZ_OUTBUFF_FULL; while (ret == BZ_OUTBUFF_FULL) { - if (out_buffer_ == NULL) { + if (out_buffer_ == nullptr) { DCHECK(!output_preallocated); temp_memory_pool_->Clear(); buffer_length_ = buffer_length_ * 2; @@ -174,7 +173,7 @@ Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_leng if (output_preallocated) { return Status("Too small buffer passed to BzipCompressor"); } - out_buffer_ = NULL; + out_buffer_ = nullptr; } } if (ret != BZ_OK) { @@ -230,7 +229,7 @@ Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated, if (output_preallocated) { buffer_length_ = *output_length; out_buffer_ = *output; - } else if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < length) { + } else if (!reuse_buffer_ || out_buffer_ == nullptr || buffer_length_ < length) { buffer_length_ = length; out_buffer_ = memory_pool_->Allocate(buffer_length_); } @@ -276,7 +275,7 @@ Status SnappyCompressor::ProcessBlock(bool output_preallocated, int64_t input_le if (!output_preallocated) { if ((!reuse_buffer_ || buffer_length_ < max_compressed_len)) { - DCHECK(memory_pool_ != NULL) << "Can't allocate without passing in a mem pool"; + DCHECK(memory_pool_ != nullptr) << "Can't allocate without passing in a mem pool"; buffer_length_ = max_compressed_len; out_buffer_ = memory_pool_->Allocate(buffer_length_); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/compress.h ---------------------------------------------------------------------- diff --git a/be/src/util/compress.h b/be/src/util/compress.h index bd38fc3..15d9b92 100644 --- a/be/src/util/compress.h +++ b/be/src/util/compress.h @@ -23,11 +23,11 @@ #include <zlib.h> #include "util/codec.h" -#include "exec/hdfs-scanner.h" -#include "runtime/mem-pool.h" namespace impala { +class MemPool; + /// Different compression classes. The classes all expose the same API and /// abstracts the underlying calls to the compression libraries. /// TODO: reconsider the abstracted API @@ -41,18 +41,19 @@ class GzipCompressor : public Codec { GZIP, }; + GzipCompressor(Format format, MemPool* mem_pool = nullptr, bool reuse_buffer = false); virtual ~GzipCompressor(); - virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL); + + virtual Status Init() override WARN_UNUSED_RESULT; + virtual int64_t MaxOutputLen( + int64_t input_len, const uint8_t* input = nullptr) override; virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, - const uint8_t* input, int64_t* output_length, uint8_t** output); + const uint8_t* input, int64_t* output_length, + uint8_t** output) override WARN_UNUSED_RESULT; - virtual std::string file_extension() const { return "gz"; } + virtual std::string file_extension() const override { return "gz"; } private: - friend class Codec; - GzipCompressor(Format format, MemPool* mem_pool = NULL, bool reuse_buffer = false); - virtual Status Init(); - Format format_; /// Structure used to communicate with the library. @@ -66,73 +67,68 @@ class GzipCompressor : public Codec { /// at least big enough. /// *output_length should be called with the length of the output buffer and on return /// is the length of the output. - Status Compress(int64_t input_length, const uint8_t* input, - int64_t* output_length, uint8_t* output); + Status Compress(int64_t input_length, const uint8_t* input, int64_t* output_length, + uint8_t* output) WARN_UNUSED_RESULT; }; class BzipCompressor : public Codec { public: + BzipCompressor(MemPool* mem_pool, bool reuse_buffer); virtual ~BzipCompressor() { } - virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL); - virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, - const uint8_t* input, int64_t* output_length, uint8_t** output); - virtual std::string file_extension() const { return "bz2"; } - private: - friend class Codec; - BzipCompressor(MemPool* mem_pool, bool reuse_buffer); - virtual Status Init() { return Status::OK(); } + virtual int64_t MaxOutputLen( + int64_t input_len, const uint8_t* input = nullptr) override; + virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, + const uint8_t* input, int64_t* output_length, + uint8_t** output) override WARN_UNUSED_RESULT; + virtual std::string file_extension() const override { return "bz2"; } }; class SnappyBlockCompressor : public Codec { public: + SnappyBlockCompressor(MemPool* mem_pool, bool reuse_buffer); virtual ~SnappyBlockCompressor() { } - virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL); - virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, - const uint8_t* input, int64_t* output_length, uint8_t** output); - virtual std::string file_extension() const { return "snappy"; } - private: - friend class Codec; - SnappyBlockCompressor(MemPool* mem_pool, bool reuse_buffer); - virtual Status Init() { return Status::OK(); } + virtual int64_t MaxOutputLen( + int64_t input_len, const uint8_t* input = nullptr) override; + virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, + const uint8_t* input, int64_t* output_length, + uint8_t** output) override WARN_UNUSED_RESULT; + virtual std::string file_extension() const override { return "snappy"; } }; class SnappyCompressor : public Codec { public: + SnappyCompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false); virtual ~SnappyCompressor() { } - virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL); + + virtual int64_t MaxOutputLen( + int64_t input_len, const uint8_t* input = nullptr) override; virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, - const uint8_t* input, int64_t* output_length, uint8_t** output); - virtual std::string file_extension() const { return "snappy"; } + const uint8_t* input, int64_t* output_length, + uint8_t** output) override WARN_UNUSED_RESULT; + virtual std::string file_extension() const override { return "snappy"; } /// Computes the crc checksum that snappy expects when used in a framing format. /// This checksum needs to come after the compressed data. /// http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt static uint32_t ComputeChecksum(int64_t input_len, const uint8_t* input); - - private: - friend class Codec; - SnappyCompressor(MemPool* mem_pool = NULL, bool reuse_buffer = false); - virtual Status Init() { return Status::OK(); } }; -/// Lz4 is a compression codec with similar compression ratios as snappy -/// but much faster decompression. This compressor is not able to compress -/// unless the output buffer is allocated and will cause an error if -/// asked to do so. +/// Lz4 is a compression codec with similar compression ratios as snappy but much faster +/// decompression. This compressor is not able to compress unless the output buffer is +/// allocated and will cause an error if asked to do so. class Lz4Compressor : public Codec { public: + Lz4Compressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false); virtual ~Lz4Compressor() { } - virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL); - virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, - const uint8_t* input, int64_t* output_length, uint8_t** output); - virtual std::string file_extension() const { return "lz4"; } - private: - friend class Codec; - Lz4Compressor(MemPool* mem_pool = NULL, bool reuse_buffer = false); - virtual Status Init() { return Status::OK(); } + virtual int64_t MaxOutputLen( + int64_t input_len, const uint8_t* input = nullptr) override; + virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, + const uint8_t* input, int64_t* output_length, + uint8_t** output) override WARN_UNUSED_RESULT; + virtual std::string file_extension() const override { return "lz4"; } }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/decompress-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc index 1f84bad..1e8760d 100644 --- a/be/src/util/decompress-test.cc +++ b/be/src/util/decompress-test.cc @@ -400,7 +400,8 @@ TEST_F(DecompressorTest, Impala1506) { MemTracker trax; MemPool pool(&trax); scoped_ptr<Codec> compressor; - Codec::CreateCompressor(&pool, true, impala::THdfsCompression::GZIP, &compressor); + EXPECT_OK( + Codec::CreateCompressor(&pool, true, impala::THdfsCompression::GZIP, &compressor)); int64_t input_len = 3; const uint8_t input[3] = {1, 2, 3}; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/decompress.cc ---------------------------------------------------------------------- diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc index 506ddd7..2488586 100644 --- a/be/src/util/decompress.cc +++ b/be/src/util/decompress.cc @@ -15,13 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include <boost/assign/list_of.hpp> #include "util/decompress.h" -#include "exec/read-write-util.h" -#include "runtime/mem-tracker.h" -#include "runtime/runtime-state.h" -#include "common/logging.h" -#include "gen-cpp/Descriptors_types.h" // Codec libraries #include <zlib.h> @@ -30,12 +24,18 @@ #include <snappy.h> #include <lz4.h> +#include "common/logging.h" +#include "exec/read-write-util.h" +#include "runtime/mem-pool.h" +#include "runtime/mem-tracker.h" + #include "common/names.h" using namespace impala; using namespace strings; -const string DECOMPRESSOR_MEM_LIMIT_EXCEEDED = "$0Decompressor failed to allocate $1 bytes."; +const string DECOMPRESSOR_MEM_LIMIT_EXCEEDED = + "$0Decompressor failed to allocate $1 bytes."; GzipDecompressor::GzipDecompressor(MemPool* mem_pool, bool reuse_buffer, bool is_deflate) : Codec(mem_pool, reuse_buffer, true), @@ -77,13 +77,14 @@ string GzipDecompressor::DebugStreamState() const { Status GzipDecompressor::ProcessBlockStreaming(int64_t input_length, const uint8_t* input, int64_t* input_bytes_read, int64_t* output_length, uint8_t** output, bool* stream_end) { - if (!reuse_buffer_ || out_buffer_ == NULL) { + if (!reuse_buffer_ || out_buffer_ == nullptr) { buffer_length_ = STREAM_OUT_BUF_SIZE; out_buffer_ = memory_pool_->TryAllocate(buffer_length_); - if (UNLIKELY(out_buffer_ == NULL)) { + if (UNLIKELY(out_buffer_ == nullptr)) { string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip", buffer_length_); - return memory_pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_length_); + return memory_pool_->mem_tracker()->MemLimitExceeded( + nullptr, details, buffer_length_); } } *output = out_buffer_; @@ -139,7 +140,7 @@ Status GzipDecompressor::ProcessBlockStreaming(int64_t input_length, const uint8 Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t* input, int64_t* output_length, uint8_t** output) { if (UNLIKELY(output_preallocated && *output_length == 0)) { - // The zlib library does not allow *output to be NULL, even when output_length is 0 + // The zlib library does not allow *output to be nullptr, even when output_length is 0 // (inflate() will return Z_STREAM_ERROR). We don't consider this an error, so bail // early if no output is expected. Note that we don't signal an error if the input // actually contains compressed data. @@ -148,15 +149,15 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le bool use_temp = false; if (!output_preallocated) { - if (!reuse_buffer_ || out_buffer_ == NULL) { + if (!reuse_buffer_ || out_buffer_ == nullptr) { // guess that we will need 2x the input length. buffer_length_ = input_length * 2; out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_); - if (UNLIKELY(out_buffer_ == NULL)) { + if (UNLIKELY(out_buffer_ == nullptr)) { string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip", buffer_length_); - return temp_memory_pool_->mem_tracker()->MemLimitExceeded(NULL, - details, buffer_length_); + return temp_memory_pool_->mem_tracker()->MemLimitExceeded( + nullptr, details, buffer_length_); } } use_temp = true; @@ -205,11 +206,11 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le temp_memory_pool_->Clear(); buffer_length_ *= 2; out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_); - if (UNLIKELY(out_buffer_ == NULL)) { + if (UNLIKELY(out_buffer_ == nullptr)) { string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip", buffer_length_); - return temp_memory_pool_->mem_tracker()->MemLimitExceeded(NULL, - details, buffer_length_); + return temp_memory_pool_->mem_tracker()->MemLimitExceeded( + nullptr, details, buffer_length_); } *output = out_buffer_; *output_length = buffer_length_; @@ -221,7 +222,7 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le } else if (ret != Z_STREAM_END) { stringstream ss; ss << "GzipDecompressor failed: "; - if (stream_.msg != NULL) ss << stream_.msg; + if (stream_.msg != nullptr) ss << stream_.msg; return Status(ss.str()); } @@ -265,15 +266,15 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le if (output_preallocated) { buffer_length_ = *output_length; out_buffer_ = *output; - } else if (!reuse_buffer_ || out_buffer_ == NULL) { + } else if (!reuse_buffer_ || out_buffer_ == nullptr) { // guess that we will need 2x the input length. buffer_length_ = input_length * 2; out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_); - if (UNLIKELY(out_buffer_ == NULL)) { + if (UNLIKELY(out_buffer_ == nullptr)) { string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip", buffer_length_); - return temp_memory_pool_->mem_tracker()->MemLimitExceeded(NULL, - details, buffer_length_); + return temp_memory_pool_->mem_tracker()->MemLimitExceeded( + nullptr, details, buffer_length_); } use_temp = true; } @@ -283,16 +284,16 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le // TODO: IMPALA-3073 Verify if compressed block could be multistream. If yes, we need // to support it and shouldn't stop decompressing while ret == BZ_STREAM_END. while (ret == BZ_OUTBUFF_FULL) { - if (out_buffer_ == NULL) { + if (out_buffer_ == nullptr) { DCHECK(!output_preallocated); temp_memory_pool_->Clear(); buffer_length_ = buffer_length_ * 2; out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_); - if (UNLIKELY(out_buffer_ == NULL)) { + if (UNLIKELY(out_buffer_ == nullptr)) { string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip", buffer_length_); - return temp_memory_pool_->mem_tracker()->MemLimitExceeded(NULL, - details, buffer_length_); + return temp_memory_pool_->mem_tracker()->MemLimitExceeded( + nullptr, details, buffer_length_); } } outlen = static_cast<unsigned int>(buffer_length_); @@ -302,7 +303,7 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le if (output_preallocated) { return Status("Too small a buffer passed to BzipDecompressor"); } - out_buffer_ = NULL; + out_buffer_ = nullptr; } } @@ -344,14 +345,14 @@ string BzipDecompressor::DebugStreamState() const { Status BzipDecompressor::ProcessBlockStreaming(int64_t input_length, const uint8_t* input, int64_t* input_bytes_read, int64_t* output_length, uint8_t** output, bool* stream_end) { - if (!reuse_buffer_ || out_buffer_ == NULL) { + if (!reuse_buffer_ || out_buffer_ == nullptr) { buffer_length_ = STREAM_OUT_BUF_SIZE; out_buffer_ = memory_pool_->TryAllocate(buffer_length_); - if (UNLIKELY(out_buffer_ == NULL)) { + if (UNLIKELY(out_buffer_ == nullptr)) { string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip", buffer_length_); - return memory_pool_->mem_tracker()->MemLimitExceeded(NULL, - details, buffer_length_); + return memory_pool_->mem_tracker()->MemLimitExceeded( + nullptr, details, buffer_length_); } } *output = out_buffer_; @@ -493,16 +494,16 @@ Status SnappyBlockDecompressor::ProcessBlock(bool output_preallocated, int64_t i const uint8_t* input, int64_t* output_len, uint8_t** output) { if (!output_preallocated) { // If we don't know the size beforehand, compute it. - RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, true, output_len, NULL)); - if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < *output_len) { + RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, true, output_len, nullptr)); + if (!reuse_buffer_ || out_buffer_ == nullptr || buffer_length_ < *output_len) { // Need to allocate a new buffer buffer_length_ = *output_len; out_buffer_ = memory_pool_->TryAllocate(buffer_length_); - if (UNLIKELY(out_buffer_ == NULL)) { + if (UNLIKELY(out_buffer_ == nullptr)) { string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "SnappyBlock", buffer_length_); - return memory_pool_->mem_tracker()->MemLimitExceeded(NULL, - details, buffer_length_); + return memory_pool_->mem_tracker()->MemLimitExceeded( + nullptr, details, buffer_length_); } } *output = out_buffer_; @@ -518,7 +519,7 @@ SnappyDecompressor::SnappyDecompressor(MemPool* mem_pool, bool reuse_buffer) } int64_t SnappyDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) { - DCHECK(input != NULL); + DCHECK(input != nullptr); size_t result; if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input), input_len, &result)) { @@ -535,14 +536,15 @@ Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_ } if (!output_preallocated) { - if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < uncompressed_length) { + if (!reuse_buffer_ || out_buffer_ == nullptr + || buffer_length_ < uncompressed_length) { buffer_length_ = uncompressed_length; out_buffer_ = memory_pool_->TryAllocate(buffer_length_); - if (UNLIKELY(out_buffer_ == NULL)) { + if (UNLIKELY(out_buffer_ == nullptr)) { string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Snappy", buffer_length_); - return memory_pool_->mem_tracker()->MemLimitExceeded(NULL, - details, buffer_length_); + return memory_pool_->mem_tracker()->MemLimitExceeded( + nullptr, details, buffer_length_); } } *output = out_buffer_; @@ -568,14 +570,14 @@ Lz4Decompressor::Lz4Decompressor(MemPool* mem_pool, bool reuse_buffer) } int64_t Lz4Decompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) { - DCHECK(input != NULL) << "Passed null input to Lz4 Decompressor"; + DCHECK(input != nullptr) << "Passed null input to Lz4 Decompressor"; return -1; } Status Lz4Decompressor::ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t* input, int64_t* output_length, uint8_t** output) { DCHECK(output_preallocated) << "Lz4 Codec implementation must have allocated output"; - // LZ4_decompress_fast will cause a segmentation fault if passed a NULL output. + // LZ4_decompress_fast will cause a segmentation fault if passed a nullptr output. if(*output_length == 0) return Status::OK(); if (LZ4_decompress_fast(reinterpret_cast<const char*>(input), reinterpret_cast<char*>(*output), *output_length) != input_length) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/decompress.h ---------------------------------------------------------------------- diff --git a/be/src/util/decompress.h b/be/src/util/decompress.h index 3c402d9..61c5994 100644 --- a/be/src/util/decompress.h +++ b/be/src/util/decompress.h @@ -24,27 +24,33 @@ #include <bzlib.h> #include "util/codec.h" -#include "exec/hdfs-scanner.h" -#include "runtime/mem-pool.h" namespace impala { +class MemPool; + class GzipDecompressor : public Codec { public: + GzipDecompressor( + MemPool* mem_pool = nullptr, bool reuse_buffer = false, bool is_deflate = false); virtual ~GzipDecompressor(); - virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL); + + virtual Status Init() override WARN_UNUSED_RESULT; + + virtual int64_t MaxOutputLen( + int64_t input_len, const uint8_t* input = nullptr) override; + virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, - const uint8_t* input, int64_t* output_length, uint8_t** output); + const uint8_t* input, int64_t* output_length, + uint8_t** output) override WARN_UNUSED_RESULT; + virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t* input, int64_t* input_bytes_read, int64_t* output_length, uint8_t** output, - bool* stream_end); - virtual std::string file_extension() const { return "gz"; } + bool* stream_end) override WARN_UNUSED_RESULT; + + virtual std::string file_extension() const override { return "gz"; } private: - friend class Codec; - GzipDecompressor( - MemPool* mem_pool = NULL, bool reuse_buffer = false, bool is_deflate = false); - virtual Status Init(); std::string DebugStreamState() const; /// If set assume deflate format, otherwise zlib or gzip @@ -59,19 +65,21 @@ class GzipDecompressor : public Codec { class BzipDecompressor : public Codec { public: + BzipDecompressor(MemPool* mem_pool, bool reuse_buffer); virtual ~BzipDecompressor(); - virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL); - virtual Status ProcessBlock(bool output_preallocated, - int64_t input_length, const uint8_t* input, - int64_t* output_length, uint8_t** output); + + virtual Status Init() override WARN_UNUSED_RESULT; + virtual int64_t MaxOutputLen( + int64_t input_len, const uint8_t* input = nullptr) override; + virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, + const uint8_t* input, int64_t* output_length, + uint8_t** output) override WARN_UNUSED_RESULT; virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t* input, - int64_t* input_bytes_read, int64_t* output_length, uint8_t** output, bool* stream_end); - virtual std::string file_extension() const { return "bz2"; } - private: - friend class Codec; - BzipDecompressor(MemPool* mem_pool, bool reuse_buffer); + int64_t* input_bytes_read, int64_t* output_length, uint8_t** output, + bool* stream_end) override WARN_UNUSED_RESULT; + virtual std::string file_extension() const override { return "bz2"; } - virtual Status Init(); + private: std::string DebugStreamState() const; /// Used for streaming decompression. @@ -84,16 +92,15 @@ class SnappyDecompressor : public Codec { /// doesn't expect this. static const uint TRAILING_CHECKSUM_LEN = 4; + SnappyDecompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false); virtual ~SnappyDecompressor() { } - virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL); - virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, - const uint8_t* input, int64_t* output_length, uint8_t** output); - virtual std::string file_extension() const { return "snappy"; } - private: - friend class Codec; - SnappyDecompressor(MemPool* mem_pool = NULL, bool reuse_buffer = false); - virtual Status Init() { return Status::OK(); } + virtual int64_t MaxOutputLen( + int64_t input_len, const uint8_t* input = nullptr) override; + virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, + const uint8_t* input, int64_t* output_length, + uint8_t** output) override WARN_UNUSED_RESULT; + virtual std::string file_extension() const override { return "snappy"; } }; /// Lz4 is a compression codec with similar compression ratios as snappy but much faster @@ -102,29 +109,27 @@ class SnappyDecompressor : public Codec { class Lz4Decompressor : public Codec { public: virtual ~Lz4Decompressor() { } - virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL); - virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, - const uint8_t* input, int64_t* output_length, uint8_t** output); - virtual std::string file_extension() const { return "lz4"; } + Lz4Decompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false); - private: - friend class Codec; - Lz4Decompressor(MemPool* mem_pool = NULL, bool reuse_buffer = false); - virtual Status Init() { return Status::OK(); } + virtual int64_t MaxOutputLen( + int64_t input_len, const uint8_t* input = nullptr) override; + virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, + const uint8_t* input, int64_t* output_length, + uint8_t** output) override WARN_UNUSED_RESULT; + virtual std::string file_extension() const override { return "lz4"; } }; class SnappyBlockDecompressor : public Codec { public: + SnappyBlockDecompressor(MemPool* mem_pool, bool reuse_buffer); virtual ~SnappyBlockDecompressor() { } - virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL); - virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, - const uint8_t* input, int64_t* output_length, uint8_t** output); - virtual std::string file_extension() const { return "snappy"; } - private: - friend class Codec; - SnappyBlockDecompressor(MemPool* mem_pool, bool reuse_buffer); - virtual Status Init() { return Status::OK(); } + virtual int64_t MaxOutputLen( + int64_t input_len, const uint8_t* input = nullptr) override; + virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, + const uint8_t* input, int64_t* output_length, + uint8_t** output) override WARN_UNUSED_RESULT; + virtual std::string file_extension() const override { return "snappy"; } }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/runtime-profile.cc ---------------------------------------------------------------------- diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc index bbc3088..ec31fc9 100644 --- a/be/src/util/runtime-profile.cc +++ b/be/src/util/runtime-profile.cc @@ -31,6 +31,7 @@ #include "util/container-util.h" #include "util/debug-util.h" #include "util/periodic-counter-updater.h" +#include "util/pretty-printer.h" #include "util/redactor.h" #include "common/names.h"
