Repository: incubator-impala Updated Branches: refs/heads/master 79205bb33 -> b55ec3f64
IMPALA-5742: De-allocate buffer in parquet-reader on exit Testing: ran with ASAN with detect_leaks=1. Leak does not reproduce with fix. Change-Id: Iedf163f858b42d2ca63a7a65d6e457539de59ab9 Reviewed-on: http://gerrit.cloudera.org:8080/7572 Reviewed-by: Henry Robinson <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b55ec3f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b55ec3f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b55ec3f6 Branch: refs/heads/master Commit: b55ec3f64f2a16259d4c5cd2e881701fee4c603f Parents: 79205bb Author: Henry Robinson <[email protected]> Authored: Mon Jul 31 20:30:40 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Aug 4 07:54:00 2017 +0000 ---------------------------------------------------------------------- be/src/util/parquet-reader.cc | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b55ec3f6/be/src/util/parquet-reader.cc ---------------------------------------------------------------------- diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc index afbb33d..e21cff6 100644 --- a/be/src/util/parquet-reader.cc +++ b/be/src/util/parquet-reader.cc @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. +#include <snappy.h> #include <iostream> #include <sstream> #include <vector> #include <gflags/gflags.h> -#include <snappy.h> #include "gen-cpp/parquet_types.h" // TCompactProtocol requires some #defines to work right. @@ -28,10 +28,10 @@ #define ARITHMETIC_RIGHT_SHIFT 1 #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wstring-plus-int" -#include <thrift/protocol/TCompactProtocol.h> +#include <thrift/TApplicationException.h> #include <thrift/protocol/TBinaryProtocol.h> +#include <thrift/protocol/TCompactProtocol.h> #include <thrift/protocol/TDebugProtocol.h> -#include <thrift/TApplicationException.h> #include <thrift/transport/TBufferTransports.h> #pragma clang diagnostic pop @@ -68,8 +68,8 @@ boost::shared_ptr<TProtocol> CreateDeserializeProtocol( // all the bytes needed to store the thrift message. On return, len will be // set to the actual length of the header. template <class T> -bool DeserializeThriftMsg(uint8_t* buf, uint32_t* len, bool compact, - T* deserialized_msg) { +bool DeserializeThriftMsg( + uint8_t* buf, uint32_t* len, bool compact, T* deserialized_msg) { // Deserialize msg bytes into c++ thrift msg using memory transport. boost::shared_ptr<TMemoryBuffer> tmem_transport(new TMemoryBuffer(buf, *len)); boost::shared_ptr<TProtocol> tproto = @@ -91,8 +91,8 @@ string TypeMapping(Type::type t) { return "UNKNOWN"; } -void AppendSchema(const vector<SchemaElement>& schema, int level, - int* idx, stringstream* ss) { +void AppendSchema( + const vector<SchemaElement>& schema, int level, int* idx, stringstream* ss) { for (int i = 0; i < level; ++i) { (*ss) << " "; } @@ -122,8 +122,8 @@ string GetSchema(const FileMetaData& md) { // Inherit from RleDecoder to get access to repeat_count_, which is protected. class ParquetLevelReader : public impala::RleDecoder { public: - ParquetLevelReader(uint8_t* buffer, int buffer_len, int bit_width) : - RleDecoder(buffer, buffer_len, bit_width) {} + ParquetLevelReader(uint8_t* buffer, int buffer_len, int bit_width) + : RleDecoder(buffer, buffer_len, bit_width) {} uint32_t repeat_count() const { return repeat_count_; } }; @@ -143,13 +143,13 @@ int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_ decompressed_buffer.resize(header.uncompressed_page_size); boost::scoped_ptr<impala::Codec> decompressor; - impala::Codec::CreateDecompressor(NULL, false, - impala::PARQUET_TO_IMPALA_CODEC[col.meta_data.codec], &decompressor); + impala::Codec::CreateDecompressor( + NULL, false, impala::PARQUET_TO_IMPALA_CODEC[col.meta_data.codec], &decompressor); uint8_t* buffer_ptr = decompressed_buffer.data(); int uncompressed_page_size = header.uncompressed_page_size; - impala::Status s = decompressor->ProcessBlock32(true, header.compressed_page_size, - data, &uncompressed_page_size, &buffer_ptr); + impala::Status s = decompressor->ProcessBlock32( + true, header.compressed_page_size, data, &uncompressed_page_size, &buffer_ptr); if (!s.ok()) { cerr << "Error: Decompression failed: " << s.GetDetail() << " \n"; exit(1); @@ -206,16 +206,16 @@ int main(int argc, char** argv) { cerr << "File Length: " << file_len << endl; - uint8_t* buffer = reinterpret_cast<uint8_t*>(malloc(file_len)); + vector<uint8_t> buffer_vector(file_len); + uint8_t* buffer = buffer_vector.data(); size_t bytes_read = fread(buffer, 1, file_len, file); assert(bytes_read == file_len); (void)bytes_read; // Check file starts and ends with magic bytes - assert( - memcmp(buffer, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) == 0); + assert(memcmp(buffer, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) == 0); assert(memcmp(buffer + file_len - sizeof(PARQUET_VERSION_NUMBER), - PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) == 0); + PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) == 0); // Get metadata uint8_t* metadata_len_ptr = @@ -253,7 +253,7 @@ int main(int argc, char** argv) { int first_page_offset = col.meta_data.data_page_offset; if (col.meta_data.__isset.dictionary_page_offset) { first_page_offset = ::min(first_page_offset, - (int)col.meta_data.dictionary_page_offset); + static_cast<int>(col.meta_data.dictionary_page_offset)); } uint8_t* data = buffer + first_page_offset; uint8_t* col_end = data + col.meta_data.total_compressed_size; @@ -288,7 +288,7 @@ int main(int argc, char** argv) { num_rows += column_num_rows[0]; } double compression_ratio = - (double)total_uncompressed_data_size / total_compressed_data_size; + static_cast<double>(total_uncompressed_data_size) / total_compressed_data_size; stringstream ss; ss << "\nSummary:\n" << " Rows: " << num_rows << endl
