Repository: incubator-impala
Updated Branches:
  refs/heads/master 1803b403e -> 6b37a793a


IMPALA-6137: fix text scanner split delim mem mgmt

The bug was that the buffer pointed to by byte_buffer_ptr_ could be freed
by ReleaseCompletedResources() before CheckForSplitDelimiter() was called.

The simple fix is to copy out the single byte that is needed each time
the buffer is filled.

Testing:
Ran exhaustive query tests under ASAN with --disable_mem_pools=true.

Before the change test_text_split_delimiters reliably caused an ASAN
failure when run with --disable_mem_pools=true. We should get this
coverage automatically once the I/O mgr switches to using the buffer
pool, which uses ASAN poisoning on freed buffers.

Change-Id: Iddbb5cf6acc8f0b0e0b4c205c334f21e03d06f1c
Reviewed-on: http://gerrit.cloudera.org:8080/8438
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6b37a793
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6b37a793
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6b37a793

Branch: refs/heads/master
Commit: 6b37a793a0af74ba01c67bb5ac7285af7c5c4d52
Parents: 1803b40
Author: Tim Armstrong <[email protected]>
Authored: Thu Nov 2 13:04:49 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri Nov 3 21:38:20 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-text-scanner.cc | 27 ++++++++++++++++++++-------
 be/src/exec/hdfs-text-scanner.h  | 23 +++++++++++++++++++++++
 2 files changed, 43 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6b37a793/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index a548ca1..f1c1ff5 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -57,6 +57,7 @@ HdfsTextScanner::HdfsTextScanner(HdfsScanNodeBase* scan_node, 
RuntimeState* stat
       byte_buffer_ptr_(nullptr),
       byte_buffer_end_(nullptr),
       byte_buffer_read_size_(0),
+      byte_buffer_filled_(false),
       only_parsing_header_(false),
       scan_state_(CONSTRUCTED),
       boundary_pool_(new MemPool(scan_node->mem_tracker())),
@@ -234,6 +235,7 @@ Status HdfsTextScanner::ResetScanner() {
   boundary_row_.Clear();
   delimited_text_parser_->ParserReset();
   byte_buffer_ptr_ = byte_buffer_end_ = nullptr;
+  byte_buffer_filled_ = false;
   partial_tuple_ = nullptr;
 
   // Initialize codegen fn
@@ -271,7 +273,8 @@ Status HdfsTextScanner::FinishScanRange(RowBatch* 
row_batch) {
     // TODO: calling FillByteBuffer() at eof() can cause
     // ScannerContext::Stream::GetNextBuffer to DCHECK. Fix this.
     if (decompressor_.get() == nullptr && !stream_->eof()) {
-      status = FillByteBuffer(row_batch->tuple_data_pool(), &eosr, 
NEXT_BLOCK_READ_SIZE);
+      status =
+        FillByteBufferWrapper(row_batch->tuple_data_pool(), &eosr, 
NEXT_BLOCK_READ_SIZE);
     }
 
     if (!status.ok() || byte_buffer_read_size_ == 0) {
@@ -342,7 +345,7 @@ Status HdfsTextScanner::ProcessRange(RowBatch* row_batch, 
int* num_tuples) {
   bool eosr = stream_->eosr() || scan_state_ == PAST_SCAN_RANGE;
   while (true) {
     if (!eosr && byte_buffer_ptr_ == byte_buffer_end_) {
-      RETURN_IF_ERROR(FillByteBuffer(pool, &eosr));
+      RETURN_IF_ERROR(FillByteBufferWrapper(pool, &eosr));
     }
 
     TupleRow* tuple_row_mem = row_batch->GetRow(row_batch->AddRow());
@@ -462,6 +465,16 @@ Status HdfsTextScanner::GetNextInternal(RowBatch* 
row_batch) {
   return Status::OK();
 }
 
+Status HdfsTextScanner::FillByteBufferWrapper(
+    MemPool* pool, bool* eosr, int num_bytes) {
+  RETURN_IF_ERROR(FillByteBuffer(pool, eosr, num_bytes));
+  if (byte_buffer_read_size_ > 0) {
+    byte_buffer_filled_ = true;
+    byte_buffer_last_byte_ = byte_buffer_end_[-1];
+  }
+  return Status::OK();
+}
+
 Status HdfsTextScanner::FillByteBuffer(MemPool* pool, bool* eosr, int 
num_bytes) {
   *eosr = false;
 
@@ -648,7 +661,7 @@ Status HdfsTextScanner::FindFirstTuple(MemPool* pool) {
     // Offset maybe not point to a tuple boundary, skip ahead to the first 
tuple start in
     // this scan range (if one exists).
     do {
-      RETURN_IF_ERROR(FillByteBuffer(nullptr, &eosr));
+      RETURN_IF_ERROR(FillByteBufferWrapper(nullptr, &eosr));
 
       delimited_text_parser_->ParserReset();
       SCOPED_TIMER(parse_delimiter_timer_);
@@ -678,7 +691,7 @@ Status HdfsTextScanner::FindFirstTuple(MemPool* pool) {
         } else {
           // Split delimiter at the end of the current buffer, but not eosr. 
Advance to
           // the correct position in the next buffer.
-          RETURN_IF_ERROR(FillByteBuffer(pool, &eosr));
+          RETURN_IF_ERROR(FillByteBufferWrapper(pool, &eosr));
           DCHECK_GT(byte_buffer_read_size_, 0);
           DCHECK_EQ(*byte_buffer_ptr_, '\n');
           byte_buffer_ptr_ += 1;
@@ -703,13 +716,13 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* 
split_delimiter) {
   DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_);
   *split_delimiter = false;
 
-  // Nothing in buffer
-  if (byte_buffer_read_size_ == 0) return Status::OK();
+  // Nothing was ever read for this scan range.
+  if (!byte_buffer_filled_) return Status::OK();
 
   // If the line delimiter is "\n" (meaning we also accept "\r" and "\r\n" as 
delimiters)
   // and the current buffer ends with '\r', this could be a "\r\n" delimiter.
   bool split_delimiter_possible = 
context_->partition_descriptor()->line_delim() == '\n'
-      && *(byte_buffer_end_ - 1) == '\r';
+      && byte_buffer_last_byte_ == '\r';
   if (!split_delimiter_possible) return Status::OK();
 
   // The '\r' may be escaped. If it's not the text parser will report a 
complete tuple.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6b37a793/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 069ca7d..610c612 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -85,6 +85,17 @@ class HdfsTextScanner : public HdfsScanner {
   /// Actual bytes received from last file read.
   int64_t byte_buffer_read_size_;
 
+  /// Last character of the last byte buffer filled, i.e. byte_buffer_end_[-1] 
when the
+  /// buffer was last filled with data. Set in FillByteBufferWrapper() and 
used in
+  /// CheckForSplitDelimiter(). Copied out of the byte buffer so that it can be
+  /// referenced after the byte buffer is freed or transferred, e.g. in 
CommitRows().
+  /// Valid if 'byte_buffer_filled_' is true.
+  char byte_buffer_last_byte_;
+
+  /// True if the byte buffer was filled with at least a byte of data since 
the last time
+  /// the scanner was reset. Set in FillByteBufferWrapper().
+  bool byte_buffer_filled_;
+
   /// True if we are parsing the header for this scanner.
   bool only_parsing_header_;
 
@@ -127,6 +138,14 @@ class HdfsTextScanner : public HdfsScanner {
   /// advances the scan state to DONE. Only valid to call in state 
PAST_SCAN_RANGE.
   Status FinishScanRange(RowBatch* row_batch) WARN_UNUSED_RESULT;
 
+  /// Wrapper around FillByteBuffer() that also updates 
'byte_buffer_last_byte_'
+  /// and 'byte_buffer_filled_'. Callers should call this instead of calling
+  /// FillByteBuffer() directly.
+  /// TODO: IMPALA-6146: this is a workaround that could be removed if 
FillByteBuffer()
+  /// was a cleaner interface.
+  Status FillByteBufferWrapper(MemPool* pool, bool* eosr, int num_bytes = 0)
+      WARN_UNUSED_RESULT;
+
   /// Fills the next byte buffer from the context.  This will block if there 
are no bytes
   /// ready.  Updates byte_buffer_ptr_, byte_buffer_end_ and 
byte_buffer_read_size_.
   /// If num_bytes is 0, the scanner will read whatever is the io mgr buffer 
size,
@@ -136,6 +155,10 @@ class HdfsTextScanner : public HdfsScanner {
   /// If applicable, attaches decompression buffers from previous calls that 
might still
   /// be referenced by returned batches to 'pool'. If 'pool' is nullptr the 
buffers are
   /// freed instead.
+  ///
+  /// Subclasses can override this function to implement different behaviour.
+  /// TODO: IMPALA-6146: rethink this interface - having subclasses modify 
member
+  /// variables is brittle.
   virtual Status FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes = 0)
       WARN_UNUSED_RESULT;
 

Reply via email to