Repository: incubator-impala Updated Branches: refs/heads/master 667a778af -> ed5ec6772
IMPALA-1619: Support 64-bit allocations. This change extends MemPool, FreePool and StringBuffer to support 64-bit allocations, fixes a bug in decompressor and extends various places in the code to support 64-bit allocation sizes. With this change, the text scanner can now decompress compressed files larger than 1GB. Note that the UDF interfaces FunctionContext::Allocate() and FunctionContext::Reallocate() still use 32-bit for the input argument to avoid breaking compatibility. In addition, the byte size of a tuple is still assumed to be within 32-bit. If it needs to be upgraded to 64-bit, it will be done in a separate change. A new test has been added to test the decompression of a 2GB snappy block compressed text file. Change-Id: Ic1af1564953ac02aca2728646973199381c86e5f Reviewed-on: http://gerrit.cloudera.org:8080/3575 Reviewed-by: Michael Ho <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ed5ec677 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ed5ec677 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ed5ec677 Branch: refs/heads/master Commit: ed5ec6772fd24ad28901bc68f120c59597439cf2 Parents: 667a778 Author: Michael Ho <[email protected]> Authored: Thu Jun 30 15:15:27 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Fri Jul 8 15:42:09 2016 -0700 ---------------------------------------------------------------------- be/src/exec/delimited-text-parser.cc | 37 +++--- be/src/exec/delimited-text-parser.h | 40 ++++--- be/src/exec/delimited-text-parser.inline.h | 55 +++++---- be/src/exec/hdfs-scanner.cc | 2 +- be/src/exec/hdfs-scanner.h | 3 + be/src/exec/hdfs-sequence-scanner.cc | 8 +- be/src/exec/hdfs-text-scanner.cc | 54 ++++----- be/src/exec/hdfs-text-scanner.h | 6 +- be/src/exec/scanner-context.cc | 16 +-- be/src/runtime/buffered-block-mgr.cc | 21 ++-- be/src/runtime/collection-value-builder.h | 2 +- be/src/runtime/free-pool-test.cc | 27 ++++- be/src/runtime/free-pool.h | 19 +-- be/src/runtime/mem-pool-test.cc | 49 +++++--- be/src/runtime/mem-pool.h | 8 +- be/src/runtime/string-buffer-test.cc | 20 ++-- be/src/runtime/string-buffer.h | 116 +++++++++---------- be/src/udf/udf-internal.h | 4 +- be/src/udf/udf.cc | 4 +- be/src/udf/udf.h | 2 + be/src/util/bit-util.h | 22 ++-- be/src/util/codec.cc | 12 +- be/src/util/codec.h | 5 - be/src/util/decompress-test.cc | 4 +- be/src/util/decompress.cc | 67 +++-------- common/thrift/generate_error_codes.py | 3 + testdata/bin/create-load-data.sh | 5 + testdata/compressed_formats/README | 4 + .../compressed_formats/compressed_payload.snap | Bin 0 -> 37270 bytes tests/query_test/test_compressed_formats.py | 95 +++++++++------ 30 files changed, 370 insertions(+), 340 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/delimited-text-parser.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/delimited-text-parser.cc b/be/src/exec/delimited-text-parser.cc index 950eae4..1a785ac 100644 --- a/be/src/exec/delimited-text-parser.cc +++ b/be/src/exec/delimited-text-parser.cc @@ -116,11 +116,11 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin if (CpuInfo::IsSupported(CpuInfo::SSE4_2)) { if (process_escapes_) { - ParseSse<true>(max_tuples, &remaining_len, byte_buffer_ptr, row_end_locations, - field_locations, num_tuples, num_fields, next_column_start); + RETURN_IF_ERROR(ParseSse<true>(max_tuples, &remaining_len, byte_buffer_ptr, + row_end_locations, field_locations, num_tuples, num_fields, next_column_start)); } else { - ParseSse<false>(max_tuples, &remaining_len, byte_buffer_ptr, row_end_locations, - field_locations, num_tuples, num_fields, next_column_start); + RETURN_IF_ERROR(ParseSse<false>(max_tuples, &remaining_len, byte_buffer_ptr, + row_end_locations, field_locations, num_tuples, num_fields, next_column_start)); } } @@ -155,9 +155,10 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin // If the row ended in \r\n then move to the \n ++*next_column_start; } else { - AddColumn<true>(*byte_buffer_ptr - *next_column_start, - next_column_start, num_fields, field_locations); - FillColumns<false>(0, NULL, num_fields, field_locations); + RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start, + next_column_start, num_fields, field_locations)); + Status status = FillColumns<false>(0, NULL, num_fields, field_locations); + DCHECK(status.ok()); column_idx_ = num_partition_keys_; row_end_locations[*num_tuples] = *byte_buffer_ptr; ++(*num_tuples); @@ -171,8 +172,8 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin return Status::OK(); } } else if (new_col) { - AddColumn<true>(*byte_buffer_ptr - *next_column_start, - next_column_start, num_fields, field_locations); + RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start, + next_column_start, num_fields, field_locations)); } --remaining_len; @@ -183,9 +184,10 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin // e.g. Sequence files. if (tuple_delim_ == '\0') { DCHECK_EQ(remaining_len, 0); - AddColumn<true>(*byte_buffer_ptr - *next_column_start, - next_column_start, num_fields, field_locations); - FillColumns<false>(0, NULL, num_fields, field_locations); + RETURN_IF_ERROR(AddColumn<true>(*byte_buffer_ptr - *next_column_start, + next_column_start, num_fields, field_locations)); + Status status = FillColumns<false>(0, NULL, num_fields, field_locations); + DCHECK(status.ok()); column_idx_ = num_partition_keys_; ++(*num_tuples); unfinished_tuple_ = false; @@ -193,11 +195,10 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin return Status::OK(); } -// Find the first instance of the tuple delimiter. This will -// find the start of the first full tuple in buffer by looking for the end of -// the previous tuple. -int DelimitedTextParser::FindFirstInstance(const char* buffer, int len) { - int tuple_start = 0; +// Find the first instance of the tuple delimiter. This will find the start of the first +// full tuple in buffer by looking for the end of the previous tuple. +int64_t DelimitedTextParser::FindFirstInstance(const char* buffer, int64_t len) { + int64_t tuple_start = 0; const char* buffer_start = buffer; bool found = false; @@ -256,7 +257,7 @@ restart: // tuple break that are all escape characters, but that is // unlikely. int num_escape_chars = 0; - int before_tuple_end = tuple_start - 2; + int64_t before_tuple_end = tuple_start - 2; // TODO: If scan range is split between escape character and tuple delimiter, // before_tuple_end will be -1. Need to scan previous range for escape characters // in this case. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/delimited-text-parser.h ---------------------------------------------------------------------- diff --git a/be/src/exec/delimited-text-parser.h b/be/src/exec/delimited-text-parser.h index f3e108c..d754c74 100644 --- a/be/src/exec/delimited-text-parser.h +++ b/be/src/exec/delimited-text-parser.h @@ -32,9 +32,9 @@ class DelimitedTextParser { /// collection_item_delim: delimits collection items /// escape_char: escape delimiters, make them part of the data. // - /// num_cols is the total number of columns including partition keys. + /// 'num_cols' is the total number of columns including partition keys. // - /// is_materialized_col should be initialized to an array of length 'num_cols', with + /// 'is_materialized_col' should be initialized to an array of length 'num_cols', with /// is_materialized_col[i] = <true if column i should be materialized, false otherwise> /// Owned by caller. // @@ -73,6 +73,8 @@ class DelimitedTextParser { /// num_fields: Number of materialized fields parsed /// next_column_start: pointer within file_buffer_ where the next field starts /// after the return from the call to ParseData + /// Returns an error status if any column exceeds the size limit. + /// See AddColumn() for details. Status ParseFieldLocations(int max_tuples, int64_t remaining_len, char** byte_buffer_ptr, char** row_end_locations, FieldLocation* field_locations, @@ -84,9 +86,10 @@ class DelimitedTextParser { /// col. /// - *num_fields returns the number of fields processed. /// This function is used to parse sequence file records which do not need to - /// parse for tuple delimiters. + /// parse for tuple delimiters. Returns an error status if any column exceeds the + /// size limit. See AddColumn() for details. template <bool process_escapes> - void ParseSingleTuple(int64_t len, char* buffer, FieldLocation* field_locations, + Status ParseSingleTuple(int64_t len, char* buffer, FieldLocation* field_locations, int* num_fields); /// FindFirstInstance returns the position after the first non-escaped tuple @@ -94,7 +97,7 @@ class DelimitedTextParser { /// Used to find the start of a tuple if jumping into the middle of a text file. /// Also used to find the sync marker for Sequenced and RC files. /// If no tuple delimiter is found within the buffer, return -1; - int FindFirstInstance(const char* buffer, int len); + int64_t FindFirstInstance(const char* buffer, int64_t len); /// Will we return the current column to the query? /// Hive allows cols at the end of the table that are not in the schema. We'll @@ -104,17 +107,18 @@ class DelimitedTextParser { } /// Fill in columns missing at the end of the tuple. - /// len and last_column may contain the length and the pointer to the + /// 'len' and 'last_column' may contain the length and the pointer to the /// last column on which the file ended without a delimiter. /// Fills in the offsets and lengths in field_locations. - /// If parsing stopped on a delimiter and there is no last column then len will be 0. + /// If parsing stopped on a delimiter and there is no last column then length will be 0. /// Other columns beyond that are filled with 0 length fields. - /// num_fields points to an initialized count of fields and will incremented + /// 'num_fields' points to an initialized count of fields and will incremented /// by the number fields added. - /// field_locations will be updated with the start and length of the fields. + /// 'field_locations' will be updated with the start and length of the fields. + /// Returns an error status if 'len' exceeds the size limit specified in AddColumn(). template <bool process_escapes> - void FillColumns(int len, char** last_column, - int* num_fields, impala::FieldLocation* field_locations); + Status FillColumns(int64_t len, char** last_column, int* num_fields, + impala::FieldLocation* field_locations); /// Return true if we have not seen a tuple delimiter for the current tuple being /// parsed (i.e., the last byte read was not a tuple delimiter). @@ -128,24 +132,28 @@ class DelimitedTextParser { /// Template parameter: /// process_escapes -- if true the the column may have escape characters /// and the negative of the len will be stored. - /// len: lenght of the current column. + /// len: length of the current column. The length of a column must fit in a 32-bit + /// signed integer (i.e. <= 2147483647 bytes). If a column is larger than that, + /// it will be treated as an error. /// Input/Output: /// next_column_start: Start of the current column, moved to the start of the next. /// num_fields: current number of fields processed, updated to next field. /// Output: /// field_locations: updated with start and length of current field. + /// Return an error status if 'len' exceeds the size limit specified above. template <bool process_escapes> - void AddColumn(int len, char** next_column_start, int* num_fields, - FieldLocation* field_locations); + Status AddColumn(int64_t len, char** next_column_start, int* num_fields, + FieldLocation* field_locations); /// Helper routine to parse delimited text using SSE instructions. /// Identical arguments as ParseFieldLocations. /// If the template argument, 'process_escapes' is true, this function will handle /// escapes, otherwise, it will assume the text is unescaped. By using templates, /// we can special case the un-escaped path for better performance. The unescaped - /// path is optimized away by the compiler. + /// path is optimized away by the compiler. Returns an error status if the length + /// of any column exceeds the size limit. See AddColumn() for details. template <bool process_escapes> - void ParseSse(int max_tuples, int64_t* remaining_len, + Status ParseSse(int max_tuples, int64_t* remaining_len, char** byte_buffer_ptr, char** row_end_locations_, FieldLocation* field_locations, int* num_tuples, int* num_fields, char** next_column_start); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/delimited-text-parser.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/delimited-text-parser.inline.h b/be/src/exec/delimited-text-parser.inline.h index e96a763..28b5b4b 100644 --- a/be/src/exec/delimited-text-parser.inline.h +++ b/be/src/exec/delimited-text-parser.inline.h @@ -26,7 +26,7 @@ namespace impala { /// If the character at n is an escape character, then delimiters(tuple/field/escape /// characters) at n+1 don't count. inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape, - uint16_t* delim_mask) { + uint16_t* delim_mask) { // Escape characters can escape escape characters. bool first_char_is_escape = *last_char_is_escape; bool escape_next = first_char_is_escape; @@ -39,7 +39,7 @@ inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape, // Remember last character for the next iteration *last_char_is_escape = escape_mask & - SSEUtil::SSE_BITMASK[SSEUtil::CHARS_PER_128_BIT_REGISTER - 1]; + SSEUtil::SSE_BITMASK[SSEUtil::CHARS_PER_128_BIT_REGISTER - 1]; // Shift escape mask up one so they match at the same bit index as the tuple and // field mask (instead of being the character before) and set the correct first bit @@ -50,35 +50,41 @@ inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape, } template <bool process_escapes> -inline void DelimitedTextParser::AddColumn(int len, char** next_column_start, +inline Status DelimitedTextParser::AddColumn(int64_t len, char** next_column_start, int* num_fields, FieldLocation* field_locations) { + if (UNLIKELY(!BitUtil::IsNonNegative32Bit(len))) { + return Status(TErrorCode::TEXT_PARSER_TRUNCATED_COLUMN, len); + } if (ReturnCurrentColumn()) { // Found a column that needs to be parsed, write the start/len to 'field_locations' field_locations[*num_fields].start = *next_column_start; + int64_t field_len = len; if (process_escapes && current_column_has_escape_) { - field_locations[*num_fields].len = -len; - } else { - field_locations[*num_fields].len = len; + field_len = -len; } + field_locations[*num_fields].len = static_cast<int32_t>(field_len); ++(*num_fields); } if (process_escapes) current_column_has_escape_ = false; *next_column_start += len + 1; ++column_idx_; + return Status::OK(); } template <bool process_escapes> -void inline DelimitedTextParser:: FillColumns(int len, char** last_column, +inline Status DelimitedTextParser::FillColumns(int64_t len, char** last_column, int* num_fields, FieldLocation* field_locations) { // Fill in any columns missing from the end of the tuple. char* dummy = NULL; if (last_column == NULL) last_column = &dummy; while (column_idx_ < num_cols_) { - AddColumn<process_escapes>(len, last_column, num_fields, field_locations); + RETURN_IF_ERROR(AddColumn<process_escapes>(len, last_column, + num_fields, field_locations)); // The rest of the columns will be null. last_column = &dummy; len = 0; } + return Status::OK(); } /// SSE optimized raw text file parsing. SSE4_2 added an instruction (with 3 modes) for @@ -95,10 +101,9 @@ void inline DelimitedTextParser:: FillColumns(int len, char** last_column, /// Haystack = 'asdfghjklhjbdwwc' (the raw string) /// Result = '1010000000011001' template <bool process_escapes> -inline void DelimitedTextParser::ParseSse(int max_tuples, +inline Status DelimitedTextParser::ParseSse(int max_tuples, int64_t* remaining_len, char** byte_buffer_ptr, - char** row_end_locations, - FieldLocation* field_locations, + char** row_end_locations, FieldLocation* field_locations, int* num_tuples, int* num_fields, char** next_column_start) { DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2)); @@ -172,8 +177,8 @@ inline void DelimitedTextParser::ParseSse(int max_tuples, char* delim_ptr = *byte_buffer_ptr + n; if (*delim_ptr == field_delim_ || *delim_ptr == collection_item_delim_) { - AddColumn<process_escapes>(delim_ptr - *next_column_start, - next_column_start, num_fields, field_locations); + RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start, + next_column_start, num_fields, field_locations)); continue; } @@ -185,9 +190,10 @@ inline void DelimitedTextParser::ParseSse(int max_tuples, last_row_delim_offset_ = -1; continue; } - AddColumn<process_escapes>(delim_ptr - *next_column_start, - next_column_start, num_fields, field_locations); - FillColumns<false>(0, NULL, num_fields, field_locations); + RETURN_IF_ERROR(AddColumn<process_escapes>(delim_ptr - *next_column_start, + next_column_start, num_fields, field_locations)); + Status status = FillColumns<false>(0, NULL, num_fields, field_locations); + DCHECK(status.ok()); column_idx_ = num_partition_keys_; row_end_locations[*num_tuples] = delim_ptr; ++(*num_tuples); @@ -200,7 +206,7 @@ inline void DelimitedTextParser::ParseSse(int max_tuples, // If the last character we processed was \r then set the offset to 0 // so that we will use it at the beginning of the next batch. if (last_row_delim_offset_ == *remaining_len) last_row_delim_offset_ = 0; - return; + return Status::OK(); } } } @@ -214,11 +220,12 @@ inline void DelimitedTextParser::ParseSse(int max_tuples, *remaining_len -= SSEUtil::CHARS_PER_128_BIT_REGISTER; *byte_buffer_ptr += SSEUtil::CHARS_PER_128_BIT_REGISTER; } + return Status::OK(); } /// Simplified version of ParseSSE which does not handle tuple delimiters. template <bool process_escapes> -inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* buffer, +inline Status DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* buffer, FieldLocation* field_locations, int* num_fields) { char* next_column_start = buffer; __m128i xmm_buffer, xmm_delim_mask, xmm_escape_mask; @@ -263,8 +270,8 @@ inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* b // clear current bit delim_mask &= ~(SSEUtil::SSE_BITMASK[n]); - AddColumn<process_escapes>(buffer + n - next_column_start, - &next_column_start, num_fields, field_locations); + RETURN_IF_ERROR(AddColumn<process_escapes>(buffer + n - next_column_start, + &next_column_start, num_fields, field_locations)); } if (process_escapes) { @@ -288,8 +295,8 @@ inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* b if (!last_char_is_escape_ && (*buffer == field_delim_ || *buffer == collection_item_delim_)) { - AddColumn<process_escapes>(buffer - next_column_start, - &next_column_start, num_fields, field_locations); + RETURN_IF_ERROR(AddColumn<process_escapes>(buffer - next_column_start, + &next_column_start, num_fields, field_locations)); } --remaining_len; @@ -298,8 +305,8 @@ inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* b // Last column does not have a delimiter after it. Add that column and also // pad with empty cols if the input is ragged. - FillColumns<process_escapes>(buffer - next_column_start, - &next_column_start, num_fields, field_locations); + return FillColumns<process_escapes>(buffer - next_column_start, + &next_column_start, num_fields, field_locations); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/hdfs-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index 78d4994..275956c 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -186,7 +186,7 @@ Status HdfsScanner::CommitRows(int num_rows) { DCHECK(batch_ != NULL); DCHECK_LE(num_rows, batch_->capacity() - batch_->num_rows()); batch_->CommitRows(num_rows); - tuple_mem_ += scan_node_->tuple_desc()->byte_size() * num_rows; + tuple_mem_ += static_cast<int64_t>(scan_node_->tuple_desc()->byte_size()) * num_rows; // We need to pass the row batch to the scan node if there is too much memory attached, // which can happen if the query is very selective. We need to release memory even http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index 7f804f1..9069451 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -58,6 +58,9 @@ struct FieldLocation { char* start; /// Encodes the length and whether or not this fields needs to be unescaped. /// If len < 0, then the field needs to be unescaped. + /// + /// Currently, 'len' has to fit in a 32-bit integer as that's the limit for StringValue + /// and StringVal. All other types shouldn't be anywhere near this limit. int len; static const char* LLVM_CLASS_NAME; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/hdfs-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc index 1460b1a..0cd000f 100644 --- a/be/src/exec/hdfs-sequence-scanner.cc +++ b/be/src/exec/hdfs-sequence-scanner.cc @@ -229,15 +229,15 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() { for (int i = 0; i < num_to_process; ++i) { int num_fields = 0; if (delimited_text_parser_->escape_char() == '\0') { - delimited_text_parser_->ParseSingleTuple<false>( + RETURN_IF_ERROR(delimited_text_parser_->ParseSingleTuple<false>( record_locations_[i].len, reinterpret_cast<char*>(record_locations_[i].record), - &field_locations_[field_location_offset], &num_fields); + &field_locations_[field_location_offset], &num_fields)); } else { - delimited_text_parser_->ParseSingleTuple<true>( + RETURN_IF_ERROR(delimited_text_parser_->ParseSingleTuple<true>( record_locations_[i].len, reinterpret_cast<char*>(record_locations_[i].record), - &field_locations_[field_location_offset], &num_fields); + &field_locations_[field_location_offset], &num_fields)); } DCHECK_EQ(num_fields, scan_node_->materialized_slots().size()); field_location_offset += num_fields; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index 6e501cc..6cc308d 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -260,8 +260,8 @@ Status HdfsTextScanner::FinishScanRange() { // tuple. DCHECK(!delimited_text_parser_->HasUnfinishedTuple()); DCHECK(partial_tuple_empty_); - DCHECK(boundary_column_.Empty()); - DCHECK(boundary_row_.Empty()); + DCHECK(boundary_column_.IsEmpty()); + DCHECK(boundary_row_.IsEmpty()); return Status::OK(); } @@ -286,24 +286,24 @@ Status HdfsTextScanner::FinishScanRange() { ss << "Read failed while trying to finish scan range: " << stream_->filename() << ":" << stream_->file_offset() << endl << status.GetDetail(); RETURN_IF_ERROR(LogOrReturnError(ErrorMsg(TErrorCode::GENERAL, ss.str()))); - } else if (!partial_tuple_empty_ || !boundary_column_.Empty() || - !boundary_row_.Empty() || + } else if (!partial_tuple_empty_ || !boundary_column_.IsEmpty() || + !boundary_row_.IsEmpty() || (delimited_text_parser_->HasUnfinishedTuple() && (!scan_node_->materialized_slots().empty() || scan_node_->num_materialized_partition_keys() > 0))) { // Missing columns or row delimiter at end of the file is ok, fill the row in. - char* col = boundary_column_.str().ptr; + char* col = boundary_column_.buffer(); int num_fields = 0; - delimited_text_parser_->FillColumns<true>(boundary_column_.Size(), - &col, &num_fields, &field_locations_[0]); + RETURN_IF_ERROR(delimited_text_parser_->FillColumns<true>(boundary_column_.len(), + &col, &num_fields, &field_locations_[0])); MemPool* pool; TupleRow* tuple_row_mem; int max_tuples = GetMemory(&pool, &tuple_, &tuple_row_mem); DCHECK_GE(max_tuples, 1); // Set variables for proper error outputting on boundary tuple - batch_start_ptr_ = boundary_row_.str().ptr; - row_end_locations_[0] = batch_start_ptr_ + boundary_row_.str().len; + batch_start_ptr_ = boundary_row_.buffer(); + row_end_locations_[0] = batch_start_ptr_ + boundary_row_.len(); int num_tuples = WriteFields(pool, tuple_row_mem, num_fields, 1); DCHECK_LE(num_tuples, 1); DCHECK_GE(num_tuples, 0); @@ -371,7 +371,7 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) { (num_fields > 0 || *num_tuples > 0)) { // There can be one partial tuple which returned no more fields from this buffer. DCHECK_LE(*num_tuples, num_fields + 1); - if (!boundary_column_.Empty()) { + if (!boundary_column_.IsEmpty()) { RETURN_IF_ERROR(CopyBoundaryField(&field_locations_[0], pool)); boundary_column_.Clear(); } @@ -423,11 +423,11 @@ Status HdfsTextScanner::FillByteBuffer(bool* eosr, int num_bytes) { Status status; if (num_bytes > 0) { stream_->GetBytes(num_bytes, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_), - &byte_buffer_read_size_, &status); + &byte_buffer_read_size_, &status); } else { DCHECK_EQ(num_bytes, 0); status = stream_->GetBuffer(false, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_), - &byte_buffer_read_size_); + &byte_buffer_read_size_); } RETURN_IF_ERROR(status); *eosr = stream_->eosr(); @@ -555,7 +555,7 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) { } // Need to read the entire file. - if (file_size < byte_buffer_read_size_) { + if (file_size > byte_buffer_read_size_) { stringstream ss; ss << "Expected to read a compressed text file of size " << file_size << " bytes. " << "But only read " << byte_buffer_read_size_ << " bytes. This may indicate " @@ -600,8 +600,8 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) { delimited_text_parser_->ParserReset(); SCOPED_TIMER(parse_delimiter_timer_); - int next_tuple_offset = 0; - int bytes_left = byte_buffer_read_size_; + int64_t next_tuple_offset = 0; + int64_t bytes_left = byte_buffer_read_size_; while (num_skipped_rows < num_rows_to_skip) { next_tuple_offset = delimited_text_parser_->FindFirstInstance(byte_buffer_ptr_, bytes_left); @@ -610,7 +610,6 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) { bytes_left -= next_tuple_offset; ++num_skipped_rows; } - if (next_tuple_offset != -1) *tuple_found = true; } while (!*tuple_found && !eosr); @@ -681,7 +680,7 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) { // codegen'd using the IRBuilder for the specific tuple description. This function // is then injected into the cross-compiled driving function, WriteAlignedTuples(). Function* HdfsTextScanner::Codegen(HdfsScanNode* node, - const vector<ExprContext*>& conjunct_ctxs) { + const vector<ExprContext*>& conjunct_ctxs) { if (!node->runtime_state()->codegen_enabled()) return NULL; LlvmCodeGen* codegen; if (!node->runtime_state()->GetCodegen(&codegen).ok()) return NULL; @@ -717,9 +716,9 @@ void HdfsTextScanner::LogRowParseError(int row_idx, stringstream* ss) { row_start = row_end_locations_[row_idx - 1] + 1; } - if (!boundary_row_.Empty()) { - // Log the beginning of the line from the previous file buffer(s) - *ss << boundary_row_.str(); + if (!boundary_row_.IsEmpty()) { + // Log the beginning of the line from the previous file buffer(s). + *ss << string(boundary_row_.buffer(), boundary_row_.len()); } // Log the erroneous line (or the suffix of a line if !boundary_line.empty()). *ss << string(row_start, row_end - row_start); @@ -792,7 +791,7 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row, // Write complete tuples. The current field, if any, is at the start of a tuple. if (num_tuples > 0) { int max_added_tuples = (scan_node_->limit() == -1) ? - num_tuples : scan_node_->limit() - scan_node_->rows_returned(); + num_tuples : scan_node_->limit() - scan_node_->rows_returned(); int tuples_returned = 0; // Call jitted function if possible if (write_tuples_fn_ != NULL) { @@ -836,31 +835,29 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row, Status HdfsTextScanner::CopyBoundaryField(FieldLocation* data, MemPool* pool) { bool needs_escape = data->len < 0; int copy_len = needs_escape ? -data->len : data->len; - int total_len = copy_len + boundary_column_.Size(); + int64_t total_len = copy_len + boundary_column_.len(); char* str_data = reinterpret_cast<char*>(pool->TryAllocate(total_len)); if (UNLIKELY(str_data == NULL)) { string details = Substitute("HdfsTextScanner::CopyBoundaryField() failed to allocate " "$0 bytes.", total_len); return pool->mem_tracker()->MemLimitExceeded(state_, details, total_len); } - memcpy(str_data, boundary_column_.str().ptr, boundary_column_.Size()); - memcpy(str_data + boundary_column_.Size(), data->start, copy_len); + memcpy(str_data, boundary_column_.buffer(), boundary_column_.len()); + memcpy(str_data + boundary_column_.len(), data->start, copy_len); data->start = str_data; data->len = needs_escape ? -total_len : total_len; return Status::OK(); } -int HdfsTextScanner::WritePartialTuple(FieldLocation* fields, +void HdfsTextScanner::WritePartialTuple(FieldLocation* fields, int num_fields, bool copy_strings) { - int next_line_offset = 0; for (int i = 0; i < num_fields; ++i) { - int need_escape = false; + bool need_escape = false; int len = fields[i].len; if (len < 0) { len = -len; need_escape = true; } - next_line_offset += (len + 1); const SlotDescriptor* desc = scan_node_->materialized_slots()[slot_idx_]; if (!text_converter_->WriteSlot(desc, partial_tuple_, @@ -870,5 +867,4 @@ int HdfsTextScanner::WritePartialTuple(FieldLocation* fields, } ++slot_idx_; } - return next_line_offset; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/hdfs-text-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h index 997637d..dae104d 100644 --- a/be/src/exec/hdfs-text-scanner.h +++ b/be/src/exec/hdfs-text-scanner.h @@ -156,9 +156,9 @@ class HdfsTextScanner : public HdfsScanner { int WriteFields(MemPool*, TupleRow* tuple_row_mem, int num_fields, int num_tuples); /// Utility function to write out 'num_fields' to 'tuple_'. This is used to parse - /// partial tuples. Returns bytes processed. If copy_strings is true, strings - /// from fields will be copied into the boundary pool. - int WritePartialTuple(FieldLocation*, int num_fields, bool copy_strings); + /// partial tuples. If copy_strings is true, strings from fields will be copied into + /// the boundary pool. + void WritePartialTuple(FieldLocation*, int num_fields, bool copy_strings); /// Appends the current file and line to the RuntimeState's error log. /// row_idx is 0-based (in current batch) where the parse error occured. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/exec/scanner-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index 3c78d88..6a5081d 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -234,15 +234,9 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len, boundary_buffer_->Clear(); } } - // Workaround IMPALA-1619. Fail the request if requested_len is more than 1GB. - // StringBuffer can only handle 32-bit allocations and StringBuffer::Append() - // will allocate twice the current buffer size, cause int overflow. - // TODO: Revert once IMPALA-1619 is fixed. - if (UNLIKELY(requested_len > StringValue::MAX_LENGTH)) { - LOG(WARNING) << "Requested buffer size " << requested_len << "B > 1GB." - << GetStackTrace(); - return Status(Substitute("Requested buffer size $0B > 1GB", requested_len)); - } + + // Resize the buffer to the right size. + RETURN_IF_ERROR(boundary_buffer_->GrowBuffer(requested_len)); while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) { // We need to fetch more bytes. Copy the end of the current buffer and fetch the next @@ -273,8 +267,8 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len, } else { RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, num_bytes)); boundary_buffer_bytes_left_ += num_bytes; - boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->str().ptr) + - boundary_buffer_->Size() - boundary_buffer_bytes_left_; + boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->buffer()) + + boundary_buffer_->len() - boundary_buffer_bytes_left_; io_buffer_bytes_left_ -= num_bytes; io_buffer_pos_ += num_bytes; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/buffered-block-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc index 8265ea2..a9e95cd 100644 --- a/be/src/runtime/buffered-block-mgr.cc +++ b/be/src/runtime/buffered-block-mgr.cc @@ -303,22 +303,14 @@ bool BufferedBlockMgr::TryAcquireTmpReservation(Client* client, int num_buffers) } bool BufferedBlockMgr::ConsumeMemory(Client* client, int64_t size) { - // Workaround IMPALA-1619. Return immediately if the allocation size will cause - // an arithmetic overflow. - if (UNLIKELY(size >= (1LL << 31))) { - // IMPALA-3238: don't repeatedly log warning when bumping up against this limit for - // large hash tables. - if (!client->logged_large_allocation_warning_) { - LOG(WARNING) << "Trying to allocate memory >=2GB (" << size << ")B." - << GetStackTrace(); - client->logged_large_allocation_warning_ = true; - } + int64_t buffers_needed = BitUtil::Ceil(size, max_block_size()); + if (UNLIKELY(!BitUtil::IsNonNegative32Bit(buffers_needed))) { + VLOG_QUERY << "Trying to consume " << size << " which is out of range."; return false; } - int buffers_needed = BitUtil::Ceil(size, max_block_size()); DCHECK_GT(buffers_needed, 0) << "Trying to consume 0 memory"; - unique_lock<mutex> lock(lock_); + unique_lock<mutex> lock(lock_); if (size < max_block_size() && mem_tracker_->TryConsume(size)) { // For small allocations (less than a block size), just let the allocation through. client->tracker_->ConsumeLocal(size, client->query_tracker_); @@ -593,7 +585,7 @@ int BufferedBlockMgr::num_pinned_buffers(Client* client) const { } int BufferedBlockMgr::num_reserved_buffers_remaining(Client* client) const { - return max(client->num_reserved_buffers_ - client->num_pinned_buffers_, 0); + return max<int>(client->num_reserved_buffers_ - client->num_pinned_buffers_, 0); } MemTracker* BufferedBlockMgr::get_tracker(Client* client) const { @@ -1017,7 +1009,8 @@ Status BufferedBlockMgr::FindBufferForBlock(Block* block, bool* in_mem) { // 1. In the unpinned list. The buffer will not be in the free list. // 2. in_write_ == true. The buffer will not be in the free list. // 3. The buffer is free, but hasn't yet been reassigned to a different block. - DCHECK_EQ(block->buffer_desc_->len, max_block_size()) << "Non-I/O blocks are always pinned"; + DCHECK_EQ(block->buffer_desc_->len, max_block_size()) + << "Non-I/O blocks are always pinned"; DCHECK(unpinned_blocks_.Contains(block) || block->in_write_ || free_io_buffers_.Contains(block->buffer_desc_)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/collection-value-builder.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/collection-value-builder.h b/be/src/runtime/collection-value-builder.h index c57b546..4065b80 100644 --- a/be/src/runtime/collection-value-builder.h +++ b/be/src/runtime/collection-value-builder.h @@ -32,7 +32,7 @@ class CollectionValueBuilder { CollectionValueBuilder(CollectionValue* coll_value, const TupleDescriptor& tuple_desc, MemPool* pool, RuntimeState* state, - int initial_tuple_capacity = DEFAULT_INITIAL_TUPLE_CAPACITY) + int64_t initial_tuple_capacity = DEFAULT_INITIAL_TUPLE_CAPACITY) : coll_value_(coll_value), tuple_desc_(tuple_desc), pool_(pool), http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/free-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/free-pool-test.cc b/be/src/runtime/free-pool-test.cc index 02d245a..c8ff93d 100644 --- a/be/src/runtime/free-pool-test.cc +++ b/be/src/runtime/free-pool-test.cc @@ -85,6 +85,22 @@ TEST(FreePoolTest, Basic) { EXPECT_EQ(mem_pool.total_allocated_bytes(), 64); mem_pool.FreeAll(); + + // Try making allocations larger than 1GB. + uint8_t* p5 = pool.Allocate(1LL << 32); + EXPECT_TRUE(p5 != NULL); + for (int64_t i = 0; i < (1LL << 32); i += (1 << 29)) { + *(p5 + i) = i; + } + EXPECT_EQ(mem_pool.total_allocated_bytes(), (1LL << 32) + 8); + + // Test zero-byte allocation. + p5 = pool.Allocate(0); + EXPECT_TRUE(p5 != NULL); + EXPECT_EQ(mem_pool.total_allocated_bytes(), (1LL << 32) + 8); + pool.Free(p5); + + mem_pool.FreeAll(); } // In this test we make two allocations at increasing sizes and then we @@ -96,13 +112,13 @@ TEST(FreePoolTest, Loop) { MemPool mem_pool(&tracker); FreePool pool(&mem_pool); - map<int, pair<uint8_t*, uint8_t*> > primed_allocations; - vector<int> allocation_sizes; + map<int64_t, pair<uint8_t*, uint8_t*> > primed_allocations; + vector<int64_t> allocation_sizes; int64_t expected_pool_size = 0; // Pick a non-power of 2 to exercise more code. - for (int size = 3; size < 1024 * 1024 * 1024; size *= 3) { + for (int64_t size = 5; size < 6LL * 1024 * 1024 * 1024; size *= 5) { uint8_t* p1 = pool.Allocate(size); uint8_t* p2 = pool.Allocate(size); EXPECT_TRUE(p1 != NULL); @@ -163,6 +179,11 @@ TEST(FreePoolTest, ReAlloc) { ptr = pool.Allocate(600); EXPECT_EQ(mem_pool.total_allocated_bytes(), 1024 + 8 + 2048 + 8); + // Try allocation larger than 1GB. + uint8_t* ptr4 = pool.Reallocate(ptr3, 1LL << 32); + EXPECT_TRUE(ptr3 != ptr4); + EXPECT_EQ(mem_pool.total_allocated_bytes(), 1024 + 8 + 2048 + 8 + (1LL << 32) + 8); + mem_pool.FreeAll(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/free-pool.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/free-pool.h b/be/src/runtime/free-pool.h index dfabaf0..90df749 100644 --- a/be/src/runtime/free-pool.h +++ b/be/src/runtime/free-pool.h @@ -52,8 +52,9 @@ class FreePool { memset(&lists_, 0, sizeof(lists_)); } - /// Allocates a buffer of size. - uint8_t* Allocate(int size) { + /// Allocates a buffer of size between [0, 2^62 - 1 - sizeof(FreeListNode)] bytes. + uint8_t* Allocate(int64_t size) { + DCHECK_GE(size, 0); #ifndef NDEBUG static int32_t alloc_counts = 0; if (FLAGS_stress_free_pool_alloc > 0 && @@ -64,16 +65,15 @@ class FreePool { ++net_allocations_; if (FLAGS_disable_mem_pools) return reinterpret_cast<uint8_t*>(malloc(size)); - /// This is the typical malloc behavior. NULL is reserved for failures. - if (size == 0) return reinterpret_cast<uint8_t*>(0x1); + /// Return a non-NULL dummy pointer. NULL is reserved for failures. + if (UNLIKELY(size == 0)) return mem_pool_->EmptyAllocPtr(); int free_list_idx = Bits::Log2Ceiling64(size); DCHECK_LT(free_list_idx, NUM_LISTS); - FreeListNode* allocation = lists_[free_list_idx].next; if (allocation == NULL) { // There wasn't an existing allocation of the right size, allocate a new one. - size = 1 << free_list_idx; + size = 1LL << free_list_idx; allocation = reinterpret_cast<FreeListNode*>( mem_pool_->Allocate(size + sizeof(FreeListNode))); if (UNLIKELY(allocation == NULL)) { @@ -97,7 +97,7 @@ class FreePool { free(ptr); return; } - if (ptr == NULL || reinterpret_cast<int64_t>(ptr) == 0x1) return; + if (UNLIKELY(ptr == NULL || ptr == mem_pool_->EmptyAllocPtr())) return; FreeListNode* node = reinterpret_cast<FreeListNode*>(ptr - sizeof(FreeListNode)); FreeListNode* list = node->list; #ifndef NDEBUG @@ -114,7 +114,7 @@ class FreePool { /// /// NULL will be returned on allocation failure. It's the caller's responsibility to /// free the memory buffer pointed to by "ptr" in this case. - uint8_t* Reallocate(uint8_t* ptr, int size) { + uint8_t* Reallocate(uint8_t* ptr, int64_t size) { #ifndef NDEBUG static int32_t alloc_counts = 0; if (FLAGS_stress_free_pool_alloc > 0 && @@ -125,13 +125,14 @@ class FreePool { if (FLAGS_disable_mem_pools) { return reinterpret_cast<uint8_t*>(realloc(reinterpret_cast<void*>(ptr), size)); } - if (ptr == NULL || reinterpret_cast<int64_t>(ptr) == 0x1) return Allocate(size); + if (UNLIKELY(ptr == NULL || ptr == mem_pool_->EmptyAllocPtr())) return Allocate(size); FreeListNode* node = reinterpret_cast<FreeListNode*>(ptr - sizeof(FreeListNode)); FreeListNode* list = node->list; #ifndef NDEBUG CheckValidAllocation(list, ptr); #endif int bucket_idx = (list - &lists_[0]); + DCHECK_LT(bucket_idx, NUM_LISTS); // This is the actual size of ptr. int allocation_size = 1 << bucket_idx; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/mem-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-pool-test.cc b/be/src/runtime/mem-pool-test.cc index ab6bd5a..121098c 100644 --- a/be/src/runtime/mem-pool-test.cc +++ b/be/src/runtime/mem-pool-test.cc @@ -21,6 +21,9 @@ #include "common/names.h" +// Maximum allocation size which exceeds 32-bit. +#define LARGE_ALLOC_SIZE (1LL << 32) + namespace impala { // Utility class to call private functions on MemPool. @@ -125,6 +128,11 @@ TEST(MemPoolTest, Basic) { p2.FreeAll(); p3.FreeAll(); } + + // Test zero byte allocation. + uint8_t* ptr = p.Allocate(0); + EXPECT_TRUE(ptr != NULL); + EXPECT_EQ(0, p.GetTotalChunkSizes()); } // Test that we can keep an allocated chunk and a free chunk. @@ -192,6 +200,22 @@ TEST(MemPoolTest, ReturnPartial) { EXPECT_EQ(2, ptr[i]); } + // Try ReturnPartialAllocations() with 64-bit values. + uint8_t* ptr4 = p.Allocate(LARGE_ALLOC_SIZE + 512); + EXPECT_EQ(1024 + LARGE_ALLOC_SIZE + 512, p.total_allocated_bytes()); + memset(ptr4, 3, 512 * 2); + p.ReturnPartialAllocation(LARGE_ALLOC_SIZE); + uint8_t* ptr5 = p.Allocate(512); + EXPECT_TRUE(ptr5 == ptr4 + 512); + memset(ptr5, 4, 512); + + for (int i = 0; i < 512; ++i) { + EXPECT_EQ(3, ptr4[i]); + } + for (int i = 512; i < 512 * 2; ++i) { + EXPECT_EQ(4, ptr4[i]); + } + p.FreeAll(); } @@ -252,51 +276,50 @@ TEST(MemPoolTest, Limits) { ASSERT_TRUE(MemPoolTest::CheckIntegrity(p2, false)); // Try To allocate 20 bytes, this should succeed. TryAllocate() should leave the - // pool in a functional state.. + // pool in a functional state. result = p2->TryAllocate(20); ASSERT_TRUE(result != NULL); ASSERT_TRUE(MemPoolTest::CheckIntegrity(p2, false)); - p2->FreeAll(); delete p2; } TEST(MemPoolTest, MaxAllocation) { - int64_t int_max_rounded = BitUtil::RoundUp(INT_MAX, 8); + int64_t int_max_rounded = BitUtil::RoundUp(LARGE_ALLOC_SIZE, 8); - // Allocate a single INT_MAX chunk + // Allocate a single LARGE_ALLOC_SIZE chunk MemTracker tracker; MemPool p1(&tracker); - uint8_t* ptr = p1.Allocate(INT_MAX); + uint8_t* ptr = p1.Allocate(LARGE_ALLOC_SIZE); EXPECT_TRUE(ptr != NULL); EXPECT_EQ(int_max_rounded, p1.GetTotalChunkSizes()); EXPECT_EQ(int_max_rounded, p1.total_allocated_bytes()); p1.FreeAll(); - // Allocate a small chunk (INITIAL_CHUNK_SIZE) followed by an INT_MAX chunk + // Allocate a small chunk (INITIAL_CHUNK_SIZE) followed by an LARGE_ALLOC_SIZE chunk MemPool p2(&tracker); p2.Allocate(8); EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE, p2.GetTotalChunkSizes()); EXPECT_EQ(8, p2.total_allocated_bytes()); - ptr = p2.Allocate(INT_MAX); + ptr = p2.Allocate(LARGE_ALLOC_SIZE); EXPECT_TRUE(ptr != NULL); EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE + int_max_rounded, p2.GetTotalChunkSizes()); EXPECT_EQ(8LL + int_max_rounded, p2.total_allocated_bytes()); p2.FreeAll(); - // Allocate three INT_MAX chunks followed by a small chunk followed by another INT_MAX - // chunk + // Allocate three LARGE_ALLOC_SIZE chunks followed by a small chunk + // followed by another LARGE_ALLOC_SIZE chunk. MemPool p3(&tracker); - p3.Allocate(INT_MAX); + p3.Allocate(LARGE_ALLOC_SIZE); // Allocates new int_max_rounded chunk - ptr = p3.Allocate(INT_MAX); + ptr = p3.Allocate(LARGE_ALLOC_SIZE); EXPECT_TRUE(ptr != NULL); EXPECT_EQ(int_max_rounded * 2, p3.GetTotalChunkSizes()); EXPECT_EQ(int_max_rounded * 2, p3.total_allocated_bytes()); // Allocates new int_max_rounded chunk - ptr = p3.Allocate(INT_MAX); + ptr = p3.Allocate(LARGE_ALLOC_SIZE); EXPECT_TRUE(ptr != NULL); EXPECT_EQ(int_max_rounded * 3, p3.GetTotalChunkSizes()); EXPECT_EQ(int_max_rounded * 3, p3.total_allocated_bytes()); @@ -308,7 +331,7 @@ TEST(MemPoolTest, MaxAllocation) { EXPECT_EQ(int_max_rounded * 3 + MemPoolTest::MAX_CHUNK_SIZE, p3.GetTotalChunkSizes()); EXPECT_EQ(int_max_rounded * 3 + 8, p3.total_allocated_bytes()); // Allocates new int_max_rounded chunk - ptr = p3.Allocate(INT_MAX); + ptr = p3.Allocate(LARGE_ALLOC_SIZE); EXPECT_TRUE(ptr != NULL); EXPECT_EQ(int_max_rounded * 4 + MemPoolTest::MAX_CHUNK_SIZE, p3.GetTotalChunkSizes()); EXPECT_EQ(int_max_rounded * 4 + 8, p3.total_allocated_bytes()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/mem-pool.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-pool.h b/be/src/runtime/mem-pool.h index 15325c0..e1c4d2b 100644 --- a/be/src/runtime/mem-pool.h +++ b/be/src/runtime/mem-pool.h @@ -101,7 +101,7 @@ class MemPool { /// Returns 'byte_size' to the current chunk back to the mem pool. This can /// only be used to return either all or part of the previous allocation returned /// by Allocate(). - void ReturnPartialAllocation(int byte_size) { + void ReturnPartialAllocation(int64_t byte_size) { DCHECK_GE(byte_size, 0); DCHECK(current_chunk_idx_ != -1); ChunkInfo& info = chunks_[current_chunk_idx_]; @@ -110,6 +110,11 @@ class MemPool { total_allocated_bytes_ -= byte_size; } + /// Return a dummy pointer for zero-length allocations. + static uint8_t* EmptyAllocPtr() { + return reinterpret_cast<uint8_t*>(&zero_length_region_); + } + /// Makes all allocated chunks available for re-use, but doesn't delete any chunks. void Clear(); @@ -208,6 +213,7 @@ class MemPool { template <bool CHECK_LIMIT_FIRST> uint8_t* Allocate(int64_t size) noexcept { + DCHECK_GE(size, 0); if (UNLIKELY(size == 0)) return reinterpret_cast<uint8_t *>(&zero_length_region_); int64_t num_bytes = BitUtil::RoundUp(size, 8); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/string-buffer-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/string-buffer-test.cc b/be/src/runtime/string-buffer-test.cc index 95d2e41..2387d60 100644 --- a/be/src/runtime/string-buffer-test.cc +++ b/be/src/runtime/string-buffer-test.cc @@ -24,10 +24,10 @@ namespace impala { void ValidateString(const string& std_str, const StringBuffer& str) { - EXPECT_EQ(std_str.empty(), str.Empty()); - EXPECT_EQ((int)std_str.size(), str.Size()); + EXPECT_EQ(std_str.empty(), str.IsEmpty()); + EXPECT_EQ(static_cast<int64_t>(std_str.size()), str.len()); if (std_str.size() > 0) { - EXPECT_EQ(strncmp(std_str.c_str(), str.str().ptr, std_str.size()), 0); + EXPECT_EQ(strncmp(std_str.c_str(), str.buffer(), std_str.size()), 0); } } @@ -55,11 +55,6 @@ TEST(StringBufferTest, Basic) { str.Append("World", strlen("World")); ValidateString(std_str, str); - // Assign - std_str.assign("foo"); - str.Assign("foo", strlen("foo")); - ValidateString(std_str, str); - // Clear std_str.clear(); str.Clear(); @@ -72,23 +67,22 @@ TEST(StringBufferTest, Basic) { } TEST(StringBufferTest, AppendBoundary) { - // Test StringBuffer::Append() up to 1GB is ok - // TODO: Once IMPALA-1619 is fixed, we should change the test to verify - // append over 2GB string is supported. + // Test StringBuffer::Append() works beyond 1GB. MemTracker tracker; MemPool pool(&tracker); StringBuffer str(&pool); string std_str; const int64_t chunk_size = 8 * 1024 * 1024; + const int64_t max_data_size = 1LL << 32; std_str.resize(chunk_size, 'a'); int64_t data_size = 0; - while (data_size + chunk_size <= StringValue::MAX_LENGTH) { + while (data_size + chunk_size <= max_data_size) { str.Append(std_str.c_str(), chunk_size); data_size += chunk_size; } EXPECT_EQ(str.buffer_size(), data_size); - std_str.resize(StringValue::MAX_LENGTH, 'a'); + std_str.resize(max_data_size, 'a'); ValidateString(std_str, str); pool.FreeAll(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/runtime/string-buffer.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h index ec59806..3725181 100644 --- a/be/src/runtime/string-buffer.h +++ b/be/src/runtime/string-buffer.h @@ -27,99 +27,87 @@ namespace impala { /// Dynamic-sizable string (similar to std::string) but without as many /// copies and allocations. -/// StringBuffer wraps a StringValue object with a pool and memory buffer length. -/// It supports a subset of the std::string functionality but will only allocate -/// bigger string buffers as necessary. std::string tries to be immutable and will -/// reallocate very often. std::string should be avoided in all hot paths. +/// StringBuffer is a buffer of char allocated from 'pool'. Current usage and size of the +/// buffer are tracked in 'len_' and 'buffer_size_' respectively. It supports a subset of +/// the std::string functionality but will only allocate bigger string buffers as +/// necessary. std::string tries to be immutable and will reallocate very often. +/// std::string should be avoided in all hot paths. class StringBuffer { public: /// C'tor for StringBuffer. Memory backing the string will be allocated from /// the pool as necessary. Can optionally be initialized from a StringValue. StringBuffer(MemPool* pool, StringValue* str = NULL) - : pool_(pool), buffer_size_(0) { + : pool_(pool), buffer_(NULL), len_(0), buffer_size_(0) { DCHECK(pool_ != NULL); if (str != NULL) { - string_value_ = *str; - buffer_size_ = str->len; + buffer_ = str->ptr; + len_ = buffer_size_ = str->len; } } /// Append 'str' to the current string, allocating a new buffer as necessary. - /// Return error status if memory limit is exceeded. - Status Append(const char* str, int len) { - int new_len = len + string_value_.len; + Status Append(const char* str, int64_t str_len) { + int64_t new_len = len_ + str_len; if (new_len > buffer_size_) RETURN_IF_ERROR(GrowBuffer(new_len)); - memcpy(string_value_.ptr + string_value_.len, str, len); - string_value_.len = new_len; + memcpy(buffer_ + len_, str, str_len); + len_ += str_len; return Status::OK(); } - /// TODO: switch everything to uint8_t? - Status Append(const uint8_t* str, int len) { - return Append(reinterpret_cast<const char*>(str), len); + /// Wrapper around append() for input type 'uint8_t'. + Status Append(const uint8_t* str, int64_t str_len) { + return Append(reinterpret_cast<const char*>(str), str_len); } - /// Assigns contents to StringBuffer. Return error status if memory limit is exceeded. - Status Assign(const char* str, int len) { - Clear(); - return Append(str, len); - } - - /// Clear the underlying StringValue. The allocated buffer can be reused. - void Clear() { - string_value_.len = 0; - } + /// Clear the underlying StringValue. The allocated buffer can be reused. + void Clear() { len_ = 0; } - /// Clears the underlying buffer and StringValue + /// Reset the usage and size of the buffer. Note that the allocated buffer is + /// retained but cannot be reused. void Reset() { - string_value_.len = 0; + len_ = 0; buffer_size_ = 0; + buffer_ = NULL; } - /// Returns whether the current string is empty - bool Empty() const { - return string_value_.len == 0; + /// Returns true if no byte is consumed in the buffer. + bool IsEmpty() const { return len_ == 0; } + + /// Grows the buffer to be at least 'new_size', copying over the previous data + /// into the new buffer. The old buffer is not freed. Return an error status if + /// growing the buffer will exceed memory limit. + Status GrowBuffer(int64_t new_size) { + if (LIKELY(new_size > buffer_size_)) { + int64_t old_size = buffer_size_; + buffer_size_ = std::max<int64_t>(buffer_size_ * 2, new_size); + char* new_buffer = reinterpret_cast<char*>(pool_->TryAllocate(buffer_size_)); + if (UNLIKELY(new_buffer == NULL)) { + string details = Substitute("StringBuffer failed to grow buffer from $0 " + "to $1 bytes.", old_size, buffer_size_); + return pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_size_); + } + if (LIKELY(len_ > 0)) memcpy(new_buffer, buffer_, len_); + buffer_ = new_buffer; + } + return Status::OK(); } - /// Returns the length of the current string - int Size() const { - return string_value_.len; - } + /// Returns the number of bytes consumed in the buffer. + int64_t len() const { return len_; } - /// Returns the underlying StringValue - const StringValue& str() const { - return string_value_; - } + /// Returns the pointer to the buffer. Note that it's the caller's responsibility + /// to not retain the pointer to 'buffer_' across call to Append() as the buffer_ + /// may be relocated in Append(). + char* buffer() const { return buffer_; } - /// Returns the buffer size - int buffer_size() const { - return buffer_size_; - } + /// Returns the size of the buffer. + int64_t buffer_size() const { return buffer_size_; } private: - /// Grows the buffer backing the string to be at least new_size, copying over the - /// previous string data into the new buffer. Return error status if memory limit - /// is exceeded. - Status GrowBuffer(int new_len) { - // TODO: Release/reuse old buffers somehow - buffer_size_ = std::max(buffer_size_ * 2, new_len); - DCHECK_LE(buffer_size_, StringValue::MAX_LENGTH); - char* new_buffer = reinterpret_cast<char*>(pool_->TryAllocate(buffer_size_)); - if (UNLIKELY(new_buffer == NULL)) { - string details = Substitute("StringBuffer failed to grow buffer by $0 bytes.", - buffer_size_); - return pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_size_); - } - if (LIKELY(string_value_.len > 0)) { - memcpy(new_buffer, string_value_.ptr, string_value_.len); - } - string_value_.ptr = new_buffer; - return Status::OK(); - } - MemPool* pool_; - StringValue string_value_; - int buffer_size_; + char* buffer_; + int64_t len_; // number of bytes consumed in the buffer. + int64_t buffer_size_; // size of the buffer. }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/udf/udf-internal.h ---------------------------------------------------------------------- diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h index 599e42f..1996838 100644 --- a/be/src/udf/udf-internal.h +++ b/be/src/udf/udf-internal.h @@ -79,7 +79,7 @@ class FunctionContextImpl { /// FreeLocalAllocations(). This is used where the lifetime of the allocation is clear. /// For UDFs, the allocations can be freed at the row level. /// TODO: free them at the batch level and save some copies? - uint8_t* AllocateLocal(int byte_size) noexcept; + uint8_t* AllocateLocal(int64_t byte_size) noexcept; /// Frees all allocations returned by AllocateLocal(). void FreeLocalAllocations() noexcept; @@ -121,7 +121,7 @@ class FunctionContextImpl { /// if necessary. /// /// Return false if 'buf' is null; returns true otherwise. - bool CheckAllocResult(const char* fn_name, uint8_t* buf, int byte_size); + bool CheckAllocResult(const char* fn_name, uint8_t* buf, int64_t byte_size); /// Preallocated buffer for storing varargs (if the function has any). Allocated and /// owned by this object, but populated by an Expr function. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/udf/udf.cc ---------------------------------------------------------------------- diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc index 4cadb63..ff81c9c 100644 --- a/be/src/udf/udf.cc +++ b/be/src/udf/udf.cc @@ -267,7 +267,7 @@ const char* FunctionContext::error_msg() const { } inline bool FunctionContextImpl::CheckAllocResult(const char* fn_name, - uint8_t* buf, int byte_size) { + uint8_t* buf, int64_t byte_size) { if (UNLIKELY(buf == NULL)) { stringstream ss; ss << string(fn_name) << "() failed to allocate " << byte_size << " bytes."; @@ -416,7 +416,7 @@ void FunctionContext::SetFunctionState(FunctionStateScope scope, void* ptr) { } } -uint8_t* FunctionContextImpl::AllocateLocal(int byte_size) noexcept { +uint8_t* FunctionContextImpl::AllocateLocal(int64_t byte_size) noexcept { assert(!closed_); if (byte_size == 0) return NULL; uint8_t* buffer = pool_->Allocate(byte_size); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/udf/udf.h ---------------------------------------------------------------------- diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h index 1b1cdab..210c855 100644 --- a/be/src/udf/udf.h +++ b/be/src/udf/udf.h @@ -140,6 +140,7 @@ class FunctionContext { /// The UDF/UDA is responsible for calling Free() on all buffers returned by Allocate(). /// If Allocate() fails or causes the memory limit to be exceeded, the error will be /// set in this object causing the query to fail. + /// TODO: 'byte_size' should be 64-bit. See IMPALA-2756. uint8_t* Allocate(int byte_size) noexcept; /// Wrapper around Allocate() to allocate a buffer of the given type "T". @@ -155,6 +156,7 @@ class FunctionContext { /// memory limit to be exceeded, the error will be set in this object. /// /// This should be used for buffers that constantly get appended to. + /// TODO: 'byte_size' should be 64-bit. See IMPALA-2756. uint8_t* Reallocate(uint8_t* ptr, int byte_size) noexcept; /// Frees a buffer returned from Allocate() or Reallocate() http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/util/bit-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h index cae8212..14553ae 100644 --- a/be/src/util/bit-util.h +++ b/be/src/util/bit-util.h @@ -23,6 +23,7 @@ #endif #include <boost/type_traits/make_unsigned.hpp> +#include <limits> #include "common/compiler-util.h" #include "util/cpu-info.h" @@ -203,31 +204,36 @@ class BitUtil { static inline uint16_t FromBigEndian(uint16_t val) { return val; } #endif - // Logical right shift for signed integer types - // This is needed because the C >> operator does arithmetic right shift - // Negative shift amounts lead to undefined behavior + /// Returns true if 'value' is a non-negative 32-bit integer. + static inline bool IsNonNegative32Bit(int64_t value) { + return static_cast<uint64_t>(value) <= std::numeric_limits<int32_t>::max(); + } + + /// Logical right shift for signed integer types + /// This is needed because the C >> operator does arithmetic right shift + /// Negative shift amounts lead to undefined behavior template<typename T> static T ShiftRightLogical(T v, int shift) { // Conversion to unsigned ensures most significant bits always filled with 0's return static_cast<typename make_unsigned<T>::type>(v) >> shift; } - // Get an specific bit of a numeric type + /// Get an specific bit of a numeric type template<typename T> static inline int8_t GetBit(T v, int bitpos) { T masked = v & (static_cast<T>(0x1) << bitpos); return static_cast<int8_t>(ShiftRightLogical(masked, bitpos)); } - // Set a specific bit to 1 - // Behavior when bitpos is negative is undefined + /// Set a specific bit to 1 + /// Behavior when bitpos is negative is undefined template<typename T> static T SetBit(T v, int bitpos) { return v | (static_cast<T>(0x1) << bitpos); } - // Set a specific bit to 0 - // Behavior when bitpos is negative is undefined + /// Set a specific bit to 0 + /// Behavior when bitpos is negative is undefined template<typename T> static T UnsetBit(T v, int bitpos) { return v & ~(static_cast<T>(0x1) << bitpos); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/util/codec.cc ---------------------------------------------------------------------- diff --git a/be/src/util/codec.cc b/be/src/util/codec.cc index 6863d1f..2ee7415 100644 --- a/be/src/util/codec.cc +++ b/be/src/util/codec.cc @@ -182,15 +182,13 @@ Status Codec::ProcessBlock32(bool output_preallocated, int input_length, const uint8_t* input, int* output_length, uint8_t** output) { int64_t input_len64 = input_length; int64_t output_len64 = *output_length; - RETURN_IF_ERROR(ProcessBlock(output_preallocated, input_len64, input, &output_len64, - output)); - // Check whether we are going to have an overflow if we are going to cast from int64_t - // to int. - // TODO: Is there a faster way to do this check? - if (UNLIKELY(output_len64 > numeric_limits<int>::max())) { + RETURN_IF_ERROR( + ProcessBlock(output_preallocated, input_len64, input, &output_len64, output)); + // Buffer size should be between [0, (2^31 - 1)] bytes. + if (UNLIKELY(!BitUtil::IsNonNegative32Bit(output_len64))) { return Status(Substitute("Arithmetic overflow in codec function. Output length is $0", output_len64));; } - *output_length = static_cast<int32_t>(output_len64); + *output_length = static_cast<int>(output_len64); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/util/codec.h ---------------------------------------------------------------------- diff --git a/be/src/util/codec.h b/be/src/util/codec.h index a337983..563a3b9 100644 --- a/be/src/util/codec.h +++ b/be/src/util/codec.h @@ -150,11 +150,6 @@ class Codec { bool supports_streaming() const { return supports_streaming_; } - /// Largest block we will compress/decompress: 2GB. - /// We are dealing with compressed blocks that are never this big but we want to guard - /// against a corrupt file that has the block length as some large number. - static const int MAX_BLOCK_SIZE = (2L * 1024 * 1024 * 1024) - 1; - protected: /// Create a compression operator /// Inputs: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/util/decompress-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc index 3a32756..7a94db0 100644 --- a/be/src/util/decompress-test.cc +++ b/be/src/util/decompress-test.cc @@ -121,10 +121,8 @@ class DecompressorTest : public ::testing::Test { EXPECT_GE(max_compressed_length, 0); uint8_t* compressed = mem_pool_.Allocate(max_compressed_length); compressed_length = max_compressed_length; - - EXPECT_OK(compressor->ProcessBlock(true, input_len, input, &compressed_length, - &compressed)); + &compressed)); } output_len = decompressor->MaxOutputLen(compressed_length, compressed); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/be/src/util/decompress.cc ---------------------------------------------------------------------- diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc index 8a2ea74..900f891 100644 --- a/be/src/util/decompress.cc +++ b/be/src/util/decompress.cc @@ -148,9 +148,6 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le if (!reuse_buffer_ || out_buffer_ == NULL) { // guess that we will need 2x the input length. buffer_length_ = input_length * 2; - if (buffer_length_ > MAX_BLOCK_SIZE) { - return Status("Decompressor: block size is too big"); - } out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_); if (UNLIKELY(out_buffer_ == NULL)) { string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip", @@ -204,11 +201,6 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le // User didn't supply the buffer, double the buffer and try again. temp_memory_pool_->Clear(); buffer_length_ *= 2; - if (buffer_length_ > MAX_BLOCK_SIZE) { - stringstream ss; - ss << "GzipDecompressor: block size is too big: " << buffer_length_; - return Status(ss.str()); - } out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_); if (UNLIKELY(out_buffer_ == NULL)) { string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip", @@ -273,9 +265,6 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le } else if (!reuse_buffer_ || out_buffer_ == NULL) { // guess that we will need 2x the input length. buffer_length_ = input_length * 2; - if (buffer_length_ > MAX_BLOCK_SIZE) { - return Status("Decompressor: block size is too big"); - } out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_); if (UNLIKELY(out_buffer_ == NULL)) { string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip", @@ -295,9 +284,6 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le DCHECK(!output_preallocated); temp_memory_pool_->Clear(); buffer_length_ = buffer_length_ * 2; - if (buffer_length_ > MAX_BLOCK_SIZE) { - return Status("Decompressor: block size is too big"); - } out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_); if (UNLIKELY(out_buffer_ == NULL)) { string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip", @@ -415,17 +401,18 @@ int64_t SnappyBlockDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* } // Hadoop uses a block compression scheme on top of snappy. As per the hadoop docs -// the input is split into blocks. Each block "contains the uncompressed length for -// the block followed by one of more length-prefixed blocks of compressed data." +// (BlockCompressorStream.java and BlockDecompressorStream.java) the input is split +// into blocks. Each block "contains the uncompressed length for the block followed +// by one of more length-prefixed blocks of compressed data." // This is essentially blocks of blocks. // The outer block consists of: -// - 4 byte little endian uncompressed_size +// - 4 byte big endian uncompressed_size // < inner blocks > // ... repeated until input_len is consumed .. // The inner blocks have: -// - 4-byte little endian compressed_size +// - 4-byte big endian compressed_size // < snappy compressed block > -// - 4-byte little endian compressed_size +// - 4-byte big endian compressed_size // < snappy compressed block > // ... repeated until uncompressed_size from outer block is consumed ... @@ -443,15 +430,6 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input, input += sizeof(uint32_t); input_len -= sizeof(uint32_t); - if (uncompressed_block_len > Codec::MAX_BLOCK_SIZE) { - if (uncompressed_total_len == 0) { - // TODO: is this check really robust? - return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_BLOCK_SIZE, - uncompressed_block_len); - } - break; - } - if (!size_only) { int64_t remaining_output_size = *output_len - uncompressed_total_len; DCHECK_GE(remaining_output_size, uncompressed_block_len); @@ -464,29 +442,23 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input, input_len -= sizeof(uint32_t); if (compressed_len == 0 || compressed_len > input_len) { - if (uncompressed_total_len == 0) { - return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_COMPRESSED_LENGTH); - } - input_len = 0; - break; + *output_len = 0; + return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_COMPRESSED_LENGTH); } // Read how big the output will be. size_t uncompressed_len; if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input), - input_len, &uncompressed_len)) { - if (uncompressed_total_len == 0) { - return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED); - } - input_len = 0; - break; + compressed_len, &uncompressed_len)) { + *output_len = 0; + return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED); } DCHECK_GT(uncompressed_len, 0); if (!size_only) { // Decompress this snappy block if (!snappy::RawUncompress(reinterpret_cast<const char*>(input), - compressed_len, output)) { + compressed_len, output)) { return Status(TErrorCode::SNAPPY_DECOMPRESS_RAW_UNCOMPRESS_FAILED); } output += uncompressed_len; @@ -526,14 +498,6 @@ Status SnappyBlockDecompressor::ProcessBlock(bool output_preallocated, int64_t i *output = out_buffer_; } - if (*output_len > MAX_BLOCK_SIZE) { - // TODO: is this check really robust? - stringstream ss; - ss << "Decompressor: block size is too big. Data is likely corrupt. " - << "Size: " << *output_len; - return Status(ss.str()); - } - char* out_ptr = reinterpret_cast<char*>(*output); RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, false, output_len, out_ptr)); return Status::OK(); @@ -547,7 +511,7 @@ int64_t SnappyDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input DCHECK(input != NULL); size_t result; if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input), - input_len, &result)) { + input_len, &result)) { return -1; } return result; @@ -560,9 +524,6 @@ Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_ if (uncompressed_length < 0) return Status("Snappy: GetUncompressedLength failed"); if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < uncompressed_length) { buffer_length_ = uncompressed_length; - if (buffer_length_ > MAX_BLOCK_SIZE) { - return Status("Decompressor: block size is too big"); - } out_buffer_ = memory_pool_->TryAllocate(buffer_length_); if (UNLIKELY(out_buffer_ == NULL)) { string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Snappy", @@ -576,7 +537,7 @@ Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_ } if (!snappy::RawUncompress(reinterpret_cast<const char*>(input), - static_cast<size_t>(input_length), reinterpret_cast<char*>(*output))) { + static_cast<size_t>(input_length), reinterpret_cast<char*>(*output))) { return Status("Snappy: RawUncompress failed"); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/common/thrift/generate_error_codes.py ---------------------------------------------------------------------- diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index 7815233..26ea78d 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -274,6 +274,9 @@ error_codes = ( ("PARQUET_CORRUPT_DICTIONARY", 89, "File '$0' is corrupt: error reading dictionary for " "data of type $1: $2"), + + ("TEXT_PARSER_TRUNCATED_COLUMN", 90, "Length of column is $0 which exceeds maximum " + "supported length of 2147483647 bytes.") ) import sys http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/testdata/bin/create-load-data.sh ---------------------------------------------------------------------- diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh index 53cdefa..dd6f8ad 100755 --- a/testdata/bin/create-load-data.sh +++ b/testdata/bin/create-load-data.sh @@ -294,6 +294,11 @@ function load-custom-data { hadoop fs -put -f ${IMPALA_HOME}/testdata/tinytable_seq_snap/tinytable_seq_snap_header_only \ /test-warehouse/tinytable_seq_snap + # IMPALA-1619: payload compressed with snappy used for constructing large snappy block + # compressed file + hadoop fs -put -f ${IMPALA_HOME}/testdata/compressed_formats/compressed_payload.snap \ + /test-warehouse/compressed_payload.snap + beeline -n $USER -u "${JDBC_URL}" -f\ ${IMPALA_HOME}/testdata/avro_schema_resolution/create_table.sql } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/testdata/compressed_formats/README ---------------------------------------------------------------------- diff --git a/testdata/compressed_formats/README b/testdata/compressed_formats/README new file mode 100644 index 0000000..892cb80 --- /dev/null +++ b/testdata/compressed_formats/README @@ -0,0 +1,4 @@ +This folder contains a file necessary to test Impala's support for snappy block compressed +file larger than 1GB. In particular, compressed_payload.snap is a string of 50176 bytes +compressed using snappy. It's the building block for constructing a large snappy block +compressed file. Please see test_compressed_formats.py for details. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/testdata/compressed_formats/compressed_payload.snap ---------------------------------------------------------------------- diff --git a/testdata/compressed_formats/compressed_payload.snap b/testdata/compressed_formats/compressed_payload.snap new file mode 100644 index 0000000..20ac4ff Binary files /dev/null and b/testdata/compressed_formats/compressed_payload.snap differ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed5ec677/tests/query_test/test_compressed_formats.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py index 15f799c..c55c9e7 100644 --- a/tests/query_test/test_compressed_formats.py +++ b/tests/query_test/test_compressed_formats.py @@ -3,6 +3,8 @@ import os import pytest import random +import string +import struct import subprocess from os.path import join from subprocess import call @@ -144,19 +146,23 @@ class TestTableWriters(ImpalaTestSuite): @pytest.mark.execute_serially class TestLargeCompressedFile(ImpalaTestSuite): - """ Tests that we gracefully handle when a compressed file in HDFS is larger - than 1GB. - This test creates a testing data file that is over 1GB and loads it to a table. - Then verifies Impala will gracefully fail the query. - TODO: Once IMPALA-1619 is fixed, modify the test to test > 2GB file.""" - + """ + Tests that Impala handles compressed files in HDFS larger than 1GB. + This test creates a 2GB test data file and loads it into a table. + """ TABLE_NAME = "large_compressed_file" TABLE_LOCATION = get_fs_path("/test-warehouse/large_compressed_file") - """ Name the file with ".snappy" extension to let scanner treat it - as a snappy compressed file.""" + """ + Name the file with ".snappy" extension to let scanner treat it as + a snappy block compressed file. + """ FILE_NAME = "largefile.snappy" - LETTERS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' - MAX_FILE_SIZE = 1024 * 1024 * 1024 + # Maximum uncompressed size of an outer block in a snappy block compressed file. + CHUNK_SIZE = 1024 * 1024 * 1024 + # Limit the max file size to 2GB or too much memory may be needed when + # uncompressing the buffer. 2GB is sufficient to show that we support + # size beyond maximum 32-bit signed value. + MAX_FILE_SIZE = 2 * CHUNK_SIZE @classmethod def get_workload(self): @@ -170,48 +176,65 @@ class TestLargeCompressedFile(ImpalaTestSuite): pytest.skip("skipping if it's not exhaustive test.") cls.TestMatrix.add_constraint(lambda v: (v.get_value('table_format').file_format =='text' and - v.get_value('table_format').compression_codec == 'none')) + v.get_value('table_format').compression_codec == 'snap')) def teardown_method(self, method): self.__drop_test_table() - def __gen_char_or_num(self): - return random.choice(self.LETTERS) - def __generate_file(self, file_name, file_size): """Generate file with random data and a specified size.""" - s = '' - for j in range(1024): - s = s + self.__gen_char_or_num() - put = subprocess.Popen(["hadoop", "fs", "-put", "-f", "-", file_name], - stdin=subprocess.PIPE, bufsize=-1) - remain = file_size % 1024 - for i in range(int(file_size / 1024)): - put.stdin.write(s) - put.stdin.write(s[0:remain]) - put.stdin.close() - put.wait() + + # Read the payload compressed using snappy. The compressed payload + # is generated from a string of 50176 bytes. + payload_size = 50176 + hdfs_cat = subprocess.Popen(["hadoop", "fs", "-cat", + "/test-warehouse/compressed_payload.snap"], stdout=subprocess.PIPE) + compressed_payload = hdfs_cat.stdout.read() + compressed_size = len(compressed_payload) + hdfs_cat.stdout.close() + hdfs_cat.wait() + + # The layout of a snappy-block compressed file is one or more + # of the following nested structure which is called "chunk" in + # the code below: + # + # - <big endian 32-bit value encoding the uncompresed size> + # - one or more blocks of the following structure: + # - <big endian 32-bit value encoding the compressed size> + # - <raw bits compressed by snappy algorithm> + + # Number of nested structures described above. + num_chunks = int(math.ceil(file_size / self.CHUNK_SIZE)) + # Number of compressed snappy blocks per chunk. + num_blocks_per_chunk = self.CHUNK_SIZE / (compressed_size + 4) + # Total uncompressed size of a nested structure. + total_chunk_size = num_blocks_per_chunk * payload_size + + hdfs_put = subprocess.Popen(["hadoop", "fs", "-put", "-f", "-", file_name], + stdin=subprocess.PIPE, bufsize=-1) + for i in range(num_chunks): + hdfs_put.stdin.write(struct.pack('>i', total_chunk_size)) + for j in range(num_blocks_per_chunk): + hdfs_put.stdin.write(struct.pack('>i', compressed_size)) + hdfs_put.stdin.write(compressed_payload) + hdfs_put.stdin.close() + hdfs_put.wait() def test_query_large_file(self, vector): self.__create_test_table(); dst_path = "%s/%s" % (self.TABLE_LOCATION, self.FILE_NAME) - file_size = self.MAX_FILE_SIZE + 1 + file_size = self.MAX_FILE_SIZE self.__generate_file(dst_path, file_size) self.client.execute("refresh %s" % self.TABLE_NAME) - # Query the table and check for expected error. - expected_error = 'Requested buffer size %dB > 1GB' % file_size - try: - result = self.client.execute("select * from %s limit 1" % self.TABLE_NAME) - assert False, "Query was expected to fail" - except Exception as e: - error_msg = str(e) - assert expected_error in error_msg + # Query the table + result = self.client.execute("select * from %s limit 1" % self.TABLE_NAME) def __create_test_table(self): self.__drop_test_table() - self.client.execute("CREATE TABLE %s (col string) LOCATION '%s'" - % (self.TABLE_NAME, self.TABLE_LOCATION)) + self.client.execute("CREATE TABLE %s (col string) " \ + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '%s'" + % (self.TABLE_NAME, self.TABLE_LOCATION)) def __drop_test_table(self): self.client.execute("DROP TABLE IF EXISTS %s" % self.TABLE_NAME)
