Repository: incubator-impala Updated Branches: refs/heads/master 1803b403e -> 6b37a793a
IMPALA-6137: fix text scanner split delim mem mgmt The bug was that the buffer pointed to by byte_buffer_ptr_ could be freed by ReleaseCompletedResources() before CheckForSplitDelimiter() was called. The simple fix is to copy out the single byte that is needed each time the buffer is filled. Testing: Ran exhaustive query tests under ASAN with --disable_mem_pools=true. Before the change test_text_split_delimiters reliably caused an ASAN failure when run with --disable_mem_pools=true. We should get this coverage automatically once the I/O mgr switches to using the buffer pool, which uses ASAN poisoning on freed buffers. Change-Id: Iddbb5cf6acc8f0b0e0b4c205c334f21e03d06f1c Reviewed-on: http://gerrit.cloudera.org:8080/8438 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6b37a793 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6b37a793 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6b37a793 Branch: refs/heads/master Commit: 6b37a793a0af74ba01c67bb5ac7285af7c5c4d52 Parents: 1803b40 Author: Tim Armstrong <[email protected]> Authored: Thu Nov 2 13:04:49 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Nov 3 21:38:20 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-text-scanner.cc | 27 ++++++++++++++++++++------- be/src/exec/hdfs-text-scanner.h | 23 +++++++++++++++++++++++ 2 files changed, 43 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6b37a793/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index a548ca1..f1c1ff5 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -57,6 +57,7 @@ HdfsTextScanner::HdfsTextScanner(HdfsScanNodeBase* scan_node, RuntimeState* stat byte_buffer_ptr_(nullptr), byte_buffer_end_(nullptr), byte_buffer_read_size_(0), + byte_buffer_filled_(false), only_parsing_header_(false), scan_state_(CONSTRUCTED), boundary_pool_(new MemPool(scan_node->mem_tracker())), @@ -234,6 +235,7 @@ Status HdfsTextScanner::ResetScanner() { boundary_row_.Clear(); delimited_text_parser_->ParserReset(); byte_buffer_ptr_ = byte_buffer_end_ = nullptr; + byte_buffer_filled_ = false; partial_tuple_ = nullptr; // Initialize codegen fn @@ -271,7 +273,8 @@ Status HdfsTextScanner::FinishScanRange(RowBatch* row_batch) { // TODO: calling FillByteBuffer() at eof() can cause // ScannerContext::Stream::GetNextBuffer to DCHECK. Fix this. if (decompressor_.get() == nullptr && !stream_->eof()) { - status = FillByteBuffer(row_batch->tuple_data_pool(), &eosr, NEXT_BLOCK_READ_SIZE); + status = + FillByteBufferWrapper(row_batch->tuple_data_pool(), &eosr, NEXT_BLOCK_READ_SIZE); } if (!status.ok() || byte_buffer_read_size_ == 0) { @@ -342,7 +345,7 @@ Status HdfsTextScanner::ProcessRange(RowBatch* row_batch, int* num_tuples) { bool eosr = stream_->eosr() || scan_state_ == PAST_SCAN_RANGE; while (true) { if (!eosr && byte_buffer_ptr_ == byte_buffer_end_) { - RETURN_IF_ERROR(FillByteBuffer(pool, &eosr)); + RETURN_IF_ERROR(FillByteBufferWrapper(pool, &eosr)); } TupleRow* tuple_row_mem = row_batch->GetRow(row_batch->AddRow()); @@ -462,6 +465,16 @@ Status HdfsTextScanner::GetNextInternal(RowBatch* row_batch) { return Status::OK(); } +Status HdfsTextScanner::FillByteBufferWrapper( + MemPool* pool, bool* eosr, int num_bytes) { + RETURN_IF_ERROR(FillByteBuffer(pool, eosr, num_bytes)); + if (byte_buffer_read_size_ > 0) { + byte_buffer_filled_ = true; + byte_buffer_last_byte_ = byte_buffer_end_[-1]; + } + return Status::OK(); +} + Status HdfsTextScanner::FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes) { *eosr = false; @@ -648,7 +661,7 @@ Status HdfsTextScanner::FindFirstTuple(MemPool* pool) { // Offset maybe not point to a tuple boundary, skip ahead to the first tuple start in // this scan range (if one exists). do { - RETURN_IF_ERROR(FillByteBuffer(nullptr, &eosr)); + RETURN_IF_ERROR(FillByteBufferWrapper(nullptr, &eosr)); delimited_text_parser_->ParserReset(); SCOPED_TIMER(parse_delimiter_timer_); @@ -678,7 +691,7 @@ Status HdfsTextScanner::FindFirstTuple(MemPool* pool) { } else { // Split delimiter at the end of the current buffer, but not eosr. Advance to // the correct position in the next buffer. - RETURN_IF_ERROR(FillByteBuffer(pool, &eosr)); + RETURN_IF_ERROR(FillByteBufferWrapper(pool, &eosr)); DCHECK_GT(byte_buffer_read_size_, 0); DCHECK_EQ(*byte_buffer_ptr_, '\n'); byte_buffer_ptr_ += 1; @@ -703,13 +716,13 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) { DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_); *split_delimiter = false; - // Nothing in buffer - if (byte_buffer_read_size_ == 0) return Status::OK(); + // Nothing was ever read for this scan range. + if (!byte_buffer_filled_) return Status::OK(); // If the line delimiter is "\n" (meaning we also accept "\r" and "\r\n" as delimiters) // and the current buffer ends with '\r', this could be a "\r\n" delimiter. bool split_delimiter_possible = context_->partition_descriptor()->line_delim() == '\n' - && *(byte_buffer_end_ - 1) == '\r'; + && byte_buffer_last_byte_ == '\r'; if (!split_delimiter_possible) return Status::OK(); // The '\r' may be escaped. If it's not the text parser will report a complete tuple. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6b37a793/be/src/exec/hdfs-text-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h index 069ca7d..610c612 100644 --- a/be/src/exec/hdfs-text-scanner.h +++ b/be/src/exec/hdfs-text-scanner.h @@ -85,6 +85,17 @@ class HdfsTextScanner : public HdfsScanner { /// Actual bytes received from last file read. int64_t byte_buffer_read_size_; + /// Last character of the last byte buffer filled, i.e. byte_buffer_end_[-1] when the + /// buffer was last filled with data. Set in FillByteBufferWrapper() and used in + /// CheckForSplitDelimiter(). Copied out of the byte buffer so that it can be + /// referenced after the byte buffer is freed or transferred, e.g. in CommitRows(). + /// Valid if 'byte_buffer_filled_' is true. + char byte_buffer_last_byte_; + + /// True if the byte buffer was filled with at least a byte of data since the last time + /// the scanner was reset. Set in FillByteBufferWrapper(). + bool byte_buffer_filled_; + /// True if we are parsing the header for this scanner. bool only_parsing_header_; @@ -127,6 +138,14 @@ class HdfsTextScanner : public HdfsScanner { /// advances the scan state to DONE. Only valid to call in state PAST_SCAN_RANGE. Status FinishScanRange(RowBatch* row_batch) WARN_UNUSED_RESULT; + /// Wrapper around FillByteBuffer() that also updates 'byte_buffer_last_byte_' + /// and 'byte_buffer_filled_'. Callers should call this instead of calling + /// FillByteBuffer() directly. + /// TODO: IMPALA-6146: this is a workaround that could be removed if FillByteBuffer() + /// was a cleaner interface. + Status FillByteBufferWrapper(MemPool* pool, bool* eosr, int num_bytes = 0) + WARN_UNUSED_RESULT; + /// Fills the next byte buffer from the context. This will block if there are no bytes /// ready. Updates byte_buffer_ptr_, byte_buffer_end_ and byte_buffer_read_size_. /// If num_bytes is 0, the scanner will read whatever is the io mgr buffer size, @@ -136,6 +155,10 @@ class HdfsTextScanner : public HdfsScanner { /// If applicable, attaches decompression buffers from previous calls that might still /// be referenced by returned batches to 'pool'. If 'pool' is nullptr the buffers are /// freed instead. + /// + /// Subclasses can override this function to implement different behaviour. + /// TODO: IMPALA-6146: rethink this interface - having subclasses modify member + /// variables is brittle. virtual Status FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes = 0) WARN_UNUSED_RESULT;
