IMPALA-5627: fix dropped statuses in HDFS writers The change is mostly mechanical - added Status returns where need.
In one place I restructured the the logic around 'current_encoding_' for Parquet to allow a cleaner solution to the dropped status from FinalizeCurrentPage() call in ProcessValue(): after the restructuring the call was no longer needed. 'current_encoding_' was overloaded to represent both the encoding of the current page and the preferred encoding for subsequent pages. Testing: Ran exhaustive build. Change-Id: I753d352c640faf5eaef650cd743e53de53761431 Reviewed-on: http://gerrit.cloudera.org:8080/7372 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/daff8eb0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/daff8eb0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/daff8eb0 Branch: refs/heads/master Commit: daff8eb0ca19aa612c9fc7cc2ddd647735b31266 Parents: 54865c4 Author: Tim Armstrong <[email protected]> Authored: Thu Jul 6 18:41:07 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Jul 21 02:51:51 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-avro-table-writer.cc | 2 +- be/src/exec/hdfs-avro-table-writer.h | 20 +++--- be/src/exec/hdfs-parquet-table-writer.cc | 88 ++++++++++++++++---------- be/src/exec/hdfs-parquet-table-writer.h | 16 ++--- be/src/exec/hdfs-sequence-table-writer.cc | 2 +- be/src/exec/hdfs-table-writer.h | 11 ++-- be/src/exec/parquet-column-stats.cc | 7 +- be/src/exec/parquet-column-stats.h | 10 ++- be/src/exec/parquet-column-stats.inline.h | 7 +- be/src/runtime/string-buffer-test.cc | 6 +- be/src/runtime/string-buffer.h | 6 +- 11 files changed, 101 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-avro-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-table-writer.cc b/be/src/exec/hdfs-avro-table-writer.cc index 46185e8..3ce296d 100644 --- a/be/src/exec/hdfs-avro-table-writer.cc +++ b/be/src/exec/hdfs-avro-table-writer.cc @@ -196,7 +196,7 @@ Status HdfsAvroTableWriter::AppendRows( } } - if (out_.Size() > DEFAULT_AVRO_BLOCK_SIZE) Flush(); + if (out_.Size() > DEFAULT_AVRO_BLOCK_SIZE) RETURN_IF_ERROR(Flush()); *new_file = false; return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-avro-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-table-writer.h b/be/src/exec/hdfs-avro-table-writer.h index f85659e..6966860 100644 --- a/be/src/exec/hdfs-avro-table-writer.h +++ b/be/src/exec/hdfs-avro-table-writer.h @@ -68,17 +68,17 @@ class HdfsAvroTableWriter : public HdfsTableWriter { virtual ~HdfsAvroTableWriter() { } - virtual Status Init(); - virtual Status Finalize() { return Flush(); } - virtual Status InitNewFile() { return WriteFileHeader(); } - virtual void Close(); - virtual uint64_t default_block_size() const { return 0; } - virtual std::string file_extension() const { return "avro"; } + virtual Status Init() override; + virtual Status Finalize() override { return Flush(); } + virtual Status InitNewFile() override { return WriteFileHeader(); } + virtual void Close() override; + virtual uint64_t default_block_size() const override { return 0; } + virtual std::string file_extension() const override { return "avro"; } /// Outputs the given rows into an HDFS sequence file. The rows are buffered /// to fill a sequence file block. - virtual Status AppendRows( - RowBatch* rows, const std::vector<int32_t>& row_group_indices, bool* new_file); + virtual Status AppendRows(RowBatch* rows, + const std::vector<int32_t>& row_group_indices, bool* new_file) override; private: /// Processes a single row, appending to out_ @@ -88,11 +88,11 @@ class HdfsAvroTableWriter : public HdfsTableWriter { inline void AppendField(const ColumnType& type, const void* value); /// Writes the Avro file header to HDFS - Status WriteFileHeader(); + Status WriteFileHeader() WARN_UNUSED_RESULT; /// Writes the contents of out_ to HDFS as a single Avro file block. /// Returns an error if write to HDFS fails. - Status Flush(); + Status Flush() WARN_UNUSED_RESULT; /// Buffer which holds accumulated output WriteStream out_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-parquet-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc index 5a2d810..04a81f1 100644 --- a/be/src/exec/hdfs-parquet-table-writer.cc +++ b/be/src/exec/hdfs-parquet-table-writer.cc @@ -103,8 +103,6 @@ class HdfsParquetTableWriter::BaseColumnWriter { values_buffer_len_(DEFAULT_DATA_PAGE_SIZE), page_stats_base_(nullptr), row_group_stats_base_(nullptr) { - Codec::CreateCompressor(nullptr, false, codec, &compressor_); - def_levels_ = parent_->state_->obj_pool()->Add( new RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE), DEFAULT_DATA_PAGE_SIZE, 1)); @@ -113,13 +111,20 @@ class HdfsParquetTableWriter::BaseColumnWriter { virtual ~BaseColumnWriter() {} + // Called after the constructor to initialize the column writer. + Status Init() WARN_UNUSED_RESULT { + Reset(); + RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, codec_, &compressor_)); + return Status::OK(); + } + // Appends the row to this column. This buffers the value into a data page. Returns // error if the space needed for the encoded value is larger than the data page size. // TODO: this needs to be batch based, instead of row based for better performance. // This is a bit trickier to handle the case where only a partial row batch can be // output to the current file because it reaches the max file size. Enabling codegen // would also solve this problem. - Status AppendRow(TupleRow* row); + Status AppendRow(TupleRow* row) WARN_UNUSED_RESULT; // Flushes all buffered data pages to the file. // *file_pos is an output parameter and will be incremented by @@ -128,13 +133,14 @@ class HdfsParquetTableWriter::BaseColumnWriter { // will contain the byte offset for the data page and dictionary page. They // will be set to -1 if the column does not contain that type of page. Status Flush(int64_t* file_pos, int64_t* first_data_page, - int64_t* first_dictionary_page); + int64_t* first_dictionary_page) WARN_UNUSED_RESULT; // Materializes the column statistics to the per-file MemPool so they are available // after their row batch buffer has been freed. - void MaterializeStatsValues() { - row_group_stats_base_->MaterializeStringValuesToInternalBuffers(); - page_stats_base_->MaterializeStringValuesToInternalBuffers(); + Status MaterializeStatsValues() WARN_UNUSED_RESULT { + RETURN_IF_ERROR(row_group_stats_base_->MaterializeStringValuesToInternalBuffers()); + RETURN_IF_ERROR(page_stats_base_->MaterializeStringValuesToInternalBuffers()); + return Status::OK(); } // Encodes the row group statistics into a parquet::Statistics object and attaches it to @@ -157,6 +163,7 @@ class HdfsParquetTableWriter::BaseColumnWriter { num_values_ = 0; total_compressed_byte_size_ = 0; current_encoding_ = Encoding::PLAIN; + next_page_encoding_ = Encoding::PLAIN; column_encodings_.clear(); dict_encoding_stats_.clear(); data_encoding_stats_.clear(); @@ -184,16 +191,18 @@ class HdfsParquetTableWriter::BaseColumnWriter { friend class HdfsParquetTableWriter; // Encodes value into the current page output buffer and updates the column statistics - // aggregates. Returns true if the value fits on the current page. If this function - // returned false, the caller should create a new page and try again with the same - // value. + // aggregates. Returns true if the value was appended successfully to the current page. + // Returns false if the value was not appended to the current page and the caller can + // create a new page and try again with the same value. May change + // 'next_page_encoding_' if the encoding for the next page should be different - e.g. + // if a dictionary overflowed and dictionary encoding is no longer viable. // *bytes_needed will contain the (estimated) number of bytes needed to successfully // encode the value in the page. // Implemented in the subclass. - virtual bool ProcessValue(void* value, int64_t* bytes_needed) = 0; + virtual bool ProcessValue(void* value, int64_t* bytes_needed) WARN_UNUSED_RESULT = 0; // Encodes out all data for the current page and updates the metadata. - virtual void FinalizeCurrentPage(); + virtual Status FinalizeCurrentPage() WARN_UNUSED_RESULT; // Update current_page_ to a new page, reusing pages allocated if possible. void NewPage(); @@ -246,10 +255,16 @@ class HdfsParquetTableWriter::BaseColumnWriter { // Pointer to the current page in 'pages_'. Not owned. DataPage* current_page_; - int64_t num_values_; // Total number of values across all pages, including nullptr. + // Total number of values across all pages, including NULL. + int64_t num_values_; int64_t total_compressed_byte_size_; int64_t total_uncompressed_byte_size_; + // Encoding of the current page. Encoding::type current_encoding_; + // Encoding to use for the next page. By default, the same as 'current_encoding_'. + // Used by the column writer to switch encoding while writing a column, e.g. if the + // dictionary overflows. + Encoding::type next_page_encoding_; // Set of all encodings used in the column chunk unordered_set<Encoding::type> column_encodings_; @@ -299,6 +314,7 @@ class HdfsParquetTableWriter::ColumnWriter : // Default to dictionary encoding. If the cardinality ends up being too high, // it will fall back to plain. current_encoding_ = Encoding::PLAIN_DICTIONARY; + next_page_encoding_ = Encoding::PLAIN_DICTIONARY; dict_encoder_.reset( new DictEncoder<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_)); dict_encoder_base_ = dict_encoder_.get(); @@ -321,10 +337,9 @@ class HdfsParquetTableWriter::ColumnWriter : ++num_values_since_dict_size_check_; *bytes_needed = dict_encoder_->Put(*CastValue(value)); // If the dictionary contains the maximum number of values, switch to plain - // encoding. The current dictionary encoded page is written out. + // encoding for the next page. The current page is full and must be written out. if (UNLIKELY(*bytes_needed < 0)) { - FinalizeCurrentPage(); - current_encoding_ = Encoding::PLAIN; + next_page_encoding_ = Encoding::PLAIN; return false; } parent_->file_size_estimate_ += *bytes_needed; @@ -423,15 +438,16 @@ class HdfsParquetTableWriter::BoolColumnWriter : return true; } - virtual void FinalizeCurrentPage() { + virtual Status FinalizeCurrentPage() { DCHECK(current_page_ != nullptr); - if (current_page_->finalized) return; + if (current_page_->finalized) return Status::OK(); bool_values_->Flush(); int num_bytes = bool_values_->bytes_written(); current_page_->header.uncompressed_page_size += num_bytes; // Call into superclass to handle the rest. - BaseColumnWriter::FinalizeCurrentPage(); + RETURN_IF_ERROR(BaseColumnWriter::FinalizeCurrentPage()); bool_values_->Clear(); + return Status::OK(); } private: @@ -455,7 +471,7 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row) // Ensure that we have enough space for the definition level, but don't write it yet in // case we don't have enough space for the value. if (def_levels_->buffer_full()) { - FinalizeCurrentPage(); + RETURN_IF_ERROR(FinalizeCurrentPage()); NewPage(); } @@ -475,11 +491,11 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row) int64_t bytes_needed = 0; if (ProcessValue(value, &bytes_needed)) { ++current_page_->num_non_null; - break; + break; // Succesfully appended, don't need to retry. } // Value didn't fit on page, try again on a new page. - FinalizeCurrentPage(); + RETURN_IF_ERROR(FinalizeCurrentPage()); // Check how much space is needed to write this value. If that is larger than the // page size then increase page size and try again. @@ -534,7 +550,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, return Status::OK(); } - FinalizeCurrentPage(); + RETURN_IF_ERROR(FinalizeCurrentPage()); *first_dictionary_page = -1; // First write the dictionary page before any of the data pages. @@ -563,8 +579,8 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, uint8_t* compressed_data = parent_->per_file_mem_pool_->Allocate(max_compressed_size); header.compressed_page_size = max_compressed_size; - compressor_->ProcessBlock32(true, header.uncompressed_page_size, dict_buffer, - &header.compressed_page_size, &compressed_data); + RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size, + dict_buffer, &header.compressed_page_size, &compressed_data)); dict_buffer = compressed_data; // We allocated the output based on the guessed size, return the extra allocated // bytes back to the mem pool. @@ -614,11 +630,11 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, return Status::OK(); } -void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() { +Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() { DCHECK(current_page_ != nullptr); - if (current_page_->finalized) return; + if (current_page_->finalized) return Status::OK(); - // If the entire page was nullptr, encode it as PLAIN since there is no + // If the entire page was NULL, encode it as PLAIN since there is no // data anyway. We don't output a useless dictionary page and it works // around a parquet MR bug (see IMPALA-759 for more details). if (current_page_->num_non_null == 0) current_encoding_ = Encoding::PLAIN; @@ -670,8 +686,8 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() { DCHECK_GT(max_compressed_size, 0); uint8_t* compressed_data = parent_->per_file_mem_pool_->Allocate(max_compressed_size); header.compressed_page_size = max_compressed_size; - compressor_->ProcessBlock32(true, header.uncompressed_page_size, uncompressed_data, - &header.compressed_page_size, &compressed_data); + RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size, + uncompressed_data, &header.compressed_page_size, &compressed_data)); current_page_->data = compressed_data; // We allocated the output based on the guessed size, return the extra allocated @@ -694,14 +710,15 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() { // Add the size of the data page header uint8_t* header_buffer; uint32_t header_len = 0; - parent_->thrift_serializer_->Serialize( - ¤t_page_->header, &header_len, &header_buffer); + RETURN_IF_ERROR(parent_->thrift_serializer_->Serialize( + ¤t_page_->header, &header_len, &header_buffer)); current_page_->finalized = true; total_compressed_byte_size_ += header_len + header.compressed_page_size; total_uncompressed_byte_size_ += header_len + header.uncompressed_page_size; parent_->file_size_estimate_ += header_len + header.compressed_page_size; def_levels_->Clear(); + return Status::OK(); } void HdfsParquetTableWriter::BaseColumnWriter::NewPage() { @@ -724,6 +741,7 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() { header.repetition_level_encoding = Encoding::BIT_PACKED; current_page_->header.__set_data_page_header(header); } + current_encoding_ = next_page_encoding_; current_page_->finalized = false; current_page_->num_non_null = 0; page_stats_base_->Reset(); @@ -828,7 +846,7 @@ Status HdfsParquetTableWriter::Init() { DCHECK(false); } columns_[i].reset(writer); - columns_[i]->Reset(); + RETURN_IF_ERROR(columns_[i]->Init()); } RETURN_IF_ERROR(CreateSchema()); return Status::OK(); @@ -989,7 +1007,9 @@ Status HdfsParquetTableWriter::AppendRows( } // We exhausted the batch, so we materialize the statistics before releasing the memory. - for (unique_ptr<BaseColumnWriter>& column : columns_) column->MaterializeStatsValues(); + for (unique_ptr<BaseColumnWriter>& column : columns_) { + RETURN_IF_ERROR(column->MaterializeStatsValues()); + } // Reset the row_idx_ when we exhaust the batch. We can exit before exhausting // the batch if we run out of file space and will continue from the last index. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-parquet-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.h b/be/src/exec/hdfs-parquet-table-writer.h index b3d319e..1334b19 100644 --- a/be/src/exec/hdfs-parquet-table-writer.h +++ b/be/src/exec/hdfs-parquet-table-writer.h @@ -58,25 +58,25 @@ class HdfsParquetTableWriter : public HdfsTableWriter { ~HdfsParquetTableWriter(); /// Initialize column information. - virtual Status Init(); + virtual Status Init() override; /// Initializes a new file. This resets the file metadata object and writes /// the file header to the output file. - virtual Status InitNewFile(); + virtual Status InitNewFile() override; /// Appends parquet representation of rows in the batch to the current file. - virtual Status AppendRows( - RowBatch* batch, const std::vector<int32_t>& row_group_indices, bool* new_file); + virtual Status AppendRows(RowBatch* batch, + const std::vector<int32_t>& row_group_indices, bool* new_file) override; /// Write out all the data. - virtual Status Finalize(); + virtual Status Finalize() override; - virtual void Close(); + virtual void Close() override; /// Returns the target HDFS block size to use. - virtual uint64_t default_block_size() const; + virtual uint64_t default_block_size() const override; - virtual std::string file_extension() const { return "parq"; } + virtual std::string file_extension() const override { return "parq"; } private: /// Default data page size. In bytes. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-sequence-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-table-writer.cc b/be/src/exec/hdfs-sequence-table-writer.cc index 4a66c5e..42a70f0 100644 --- a/be/src/exec/hdfs-sequence-table-writer.cc +++ b/be/src/exec/hdfs-sequence-table-writer.cc @@ -125,7 +125,7 @@ Status HdfsSequenceTableWriter::AppendRows( out_.WriteBytes(neg1_sync_marker_.size(), neg1_sync_marker_.data()); } - if (out_.Size() >= approx_block_size_) Flush(); + if (out_.Size() >= approx_block_size_) RETURN_IF_ERROR(Flush()); *new_file = false; return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/hdfs-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h index cc08b00..cc6c6cc 100644 --- a/be/src/exec/hdfs-table-writer.h +++ b/be/src/exec/hdfs-table-writer.h @@ -64,10 +64,10 @@ class HdfsTableWriter { /// text), 1) is called once and 2-4) is called repeatedly for each file. /// Do initialization of writer. - virtual Status Init() = 0; + virtual Status Init() WARN_UNUSED_RESULT = 0; /// Called when a new file is started. - virtual Status InitNewFile() = 0; + virtual Status InitNewFile() WARN_UNUSED_RESULT = 0; /// Appends rows of 'batch' to the partition that are selected via 'row_group_indices', /// and if the latter is empty, appends every row. @@ -75,13 +75,14 @@ class HdfsTableWriter { /// *new_file == true. A new file will be opened and the same row batch will be passed /// again. The writer must track how much of the batch it had already processed asking /// for a new file. Otherwise the writer will return with *newfile == false. - virtual Status AppendRows( - RowBatch* batch, const std::vector<int32_t>& row_group_indices, bool* new_file) = 0; + virtual Status AppendRows(RowBatch* batch, + const std::vector<int32_t>& row_group_indices, + bool* new_file) WARN_UNUSED_RESULT = 0; /// Finalize this partition. The writer needs to finish processing /// all data have written out after the return from this call. /// This is called once for each call to InitNewFile() - virtual Status Finalize() = 0; + virtual Status Finalize() WARN_UNUSED_RESULT = 0; /// Called once when this writer should cleanup any resources. virtual void Close() = 0; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/parquet-column-stats.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-stats.cc b/be/src/exec/parquet-column-stats.cc index bcd6fa4..76b3365 100644 --- a/be/src/exec/parquet-column-stats.cc +++ b/be/src/exec/parquet-column-stats.cc @@ -117,11 +117,12 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk, return false; } -void ColumnStatsBase::CopyToBuffer(StringBuffer* buffer, StringValue* value) { - if (value->ptr == buffer->buffer()) return; +Status ColumnStatsBase::CopyToBuffer(StringBuffer* buffer, StringValue* value) { + if (value->ptr == buffer->buffer()) return Status::OK(); buffer->Clear(); - buffer->Append(value->ptr, value->len); + RETURN_IF_ERROR(buffer->Append(value->ptr, value->len)); value->ptr = buffer->buffer(); + return Status::OK(); } bool ColumnStatsBase::CanUseStats( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/parquet-column-stats.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-stats.h b/be/src/exec/parquet-column-stats.h index 11a01f5..7278cdc 100644 --- a/be/src/exec/parquet-column-stats.h +++ b/be/src/exec/parquet-column-stats.h @@ -81,7 +81,9 @@ class ColumnStatsBase { /// data types (e.g. StringValue) need to be copied at the end of processing a row /// batch, since the batch memory will be released. Overwrite this method in derived /// classes to provide the functionality. - virtual void MaterializeStringValuesToInternalBuffers() {} + virtual Status MaterializeStringValuesToInternalBuffers() WARN_UNUSED_RESULT { + return Status::OK(); + } /// Returns the number of bytes needed to encode the current statistics into a /// parquet::Statistics object. @@ -100,7 +102,7 @@ class ColumnStatsBase { protected: // Copies the memory of 'value' into 'buffer' and make 'value' point to 'buffer'. // 'buffer' is reset before making the copy. - static void CopyToBuffer(StringBuffer* buffer, StringValue* value); + static Status CopyToBuffer(StringBuffer* buffer, StringValue* value) WARN_UNUSED_RESULT; /// Stores whether the min and max values of the current object have been initialized. bool has_min_max_values_; @@ -163,7 +165,9 @@ class ColumnStats : public ColumnStatsBase { void Update(const T& v) { Update(v, v); } virtual void Merge(const ColumnStatsBase& other) override; - virtual void MaterializeStringValuesToInternalBuffers() override {} + virtual Status MaterializeStringValuesToInternalBuffers() override { + return Status::OK(); + } virtual int64_t BytesNeeded() const override; virtual void EncodeToThrift(parquet::Statistics* out) const override; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/exec/parquet-column-stats.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-stats.inline.h b/be/src/exec/parquet-column-stats.inline.h index b112db3..9b81ba8 100644 --- a/be/src/exec/parquet-column-stats.inline.h +++ b/be/src/exec/parquet-column-stats.inline.h @@ -170,9 +170,10 @@ inline void ColumnStats<StringValue>::Update( // StringValues need to be copied at the end of processing a row batch, since the batch // memory will be released. template <> -inline void ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() { - if (min_buffer_.IsEmpty()) CopyToBuffer(&min_buffer_, &min_value_); - if (max_buffer_.IsEmpty()) CopyToBuffer(&max_buffer_, &max_value_); +inline Status ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() { + if (min_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&min_buffer_, &min_value_)); + if (max_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&max_buffer_, &max_value_)); + return Status::OK(); } } // end ns impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/runtime/string-buffer-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/string-buffer-test.cc b/be/src/runtime/string-buffer-test.cc index c370728..27d1021 100644 --- a/be/src/runtime/string-buffer-test.cc +++ b/be/src/runtime/string-buffer-test.cc @@ -50,12 +50,12 @@ TEST(StringBufferTest, Basic) { // Append to empty std_str.append("Hello"); - str.Append("Hello", strlen("Hello")); + ASSERT_OK(str.Append("Hello", strlen("Hello"))); ValidateString(std_str, str); // Append some more std_str.append("World"); - str.Append("World", strlen("World")); + ASSERT_OK(str.Append("World", strlen("World"))); ValidateString(std_str, str); // Clear @@ -81,7 +81,7 @@ TEST(StringBufferTest, AppendBoundary) { std_str.resize(chunk_size, 'a'); int64_t data_size = 0; while (data_size + chunk_size <= max_data_size) { - str.Append(std_str.c_str(), chunk_size); + ASSERT_OK(str.Append(std_str.c_str(), chunk_size)); data_size += chunk_size; } EXPECT_EQ(str.buffer_size(), data_size); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/daff8eb0/be/src/runtime/string-buffer.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h index 5682bc7..2188e4d 100644 --- a/be/src/runtime/string-buffer.h +++ b/be/src/runtime/string-buffer.h @@ -48,7 +48,7 @@ class StringBuffer { /// Append 'str' to the current string, allocating a new buffer as necessary. /// Return error status if memory limit is exceeded. - Status Append(const char* str, int64_t str_len) { + Status Append(const char* str, int64_t str_len) WARN_UNUSED_RESULT { int64_t new_len = len_ + str_len; if (UNLIKELY(new_len > buffer_size_)) RETURN_IF_ERROR(GrowBuffer(new_len)); memcpy(buffer_ + len_, str, str_len); @@ -57,7 +57,7 @@ class StringBuffer { } /// Wrapper around append() for input type 'uint8_t'. - Status Append(const uint8_t* str, int64_t str_len) { + Status Append(const uint8_t* str, int64_t str_len) WARN_UNUSED_RESULT { return Append(reinterpret_cast<const char*>(str), str_len); } @@ -78,7 +78,7 @@ class StringBuffer { /// Grows the buffer to be at least 'new_size', copying over the previous data /// into the new buffer. The old buffer is not freed. Return an error status if /// growing the buffer will exceed memory limit. - Status GrowBuffer(int64_t new_size) { + Status GrowBuffer(int64_t new_size) WARN_UNUSED_RESULT { if (LIKELY(new_size > buffer_size_)) { int64_t old_size = buffer_size_; buffer_size_ = std::max<int64_t>(buffer_size_ * 2, new_size);
