IMPALA-4391: fix dropped statuses in scanners

As far as I'm aware we haven't seen any failures related to these in
practice, but fixing them as a preventative measure.

Testing:
I was unable to reproduce this easily by adding a failpoint. I suspect
that stress testing of the affected file formats would have eventually
found this, because of the similarity to IMPALA-3962.

Change-Id: I7c47f8b29a20dc74ad9e9e1c58b3d7ed95de4d35
Reviewed-on: http://gerrit.cloudera.org:8080/4938
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3e18755e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3e18755e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3e18755e

Branch: refs/heads/master
Commit: 3e18755edaecfa475fa9d0babc3e71ab1016686a
Parents: 832fb53
Author: Tim Armstrong <[email protected]>
Authored: Thu Nov 3 07:54:59 2016 -0700
Committer: Internal Jenkins <[email protected]>
Committed: Sat Nov 5 04:43:55 2016 +0000

----------------------------------------------------------------------
 be/src/exec/base-sequence-scanner.cc |  2 +-
 be/src/exec/hdfs-avro-scanner.cc     |  2 +-
 be/src/exec/hdfs-rcfile-scanner.cc   |  9 +++++----
 be/src/exec/hdfs-rcfile-scanner.h    |  4 ++--
 be/src/exec/hdfs-scanner.h           | 13 ++++++++-----
 be/src/exec/hdfs-sequence-scanner.cc |  2 +-
 be/src/exec/hdfs-text-scanner.cc     |  2 +-
 7 files changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e18755e/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc 
b/be/src/exec/base-sequence-scanner.cc
index 5f6bad6..cc0722d 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -152,7 +152,7 @@ Status BaseSequenceScanner::ProcessSplit() {
     static_cast<HdfsScanNode*>(scan_node_)->SetFileMetadata(
         stream_->filename(), header_);
     HdfsFileDesc* desc = scan_node_->GetFileDesc(stream_->filename());
-    scan_node_->AddDiskIoRanges(desc);
+    RETURN_IF_ERROR(scan_node_->AddDiskIoRanges(desc));
     return Status::OK();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e18755e/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index b2e948e..a073cef 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -562,7 +562,7 @@ Status HdfsAvroScanner::ProcessRange() {
     }
 
     if (decompressor_.get() != NULL && !decompressor_->reuse_output_buffer()) {
-      AttachPool(data_buffer_pool_.get(), true);
+      RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), true));
     }
     RETURN_IF_ERROR(ReadSync());
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e18755e/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc 
b/be/src/exec/hdfs-rcfile-scanner.cc
index 012a424..eb78fb9 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -231,7 +231,7 @@ BaseSequenceScanner::FileHeader* 
HdfsRCFileScanner::AllocateFileHeader() {
   return new RcFileHeader;
 }
 
-void HdfsRCFileScanner::ResetRowGroup() {
+Status HdfsRCFileScanner::ResetRowGroup() {
   num_rows_ = 0;
   row_pos_ = 0;
   key_length_ = 0;
@@ -249,13 +249,14 @@ void HdfsRCFileScanner::ResetRowGroup() {
 
   // We are done with this row group, pass along external buffers if necessary.
   if (!reuse_row_group_buffer_) {
-    AttachPool(data_buffer_pool_.get(), true);
+    RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), true));
     row_group_buffer_size_ = 0;
   }
+  return Status::OK();
 }
 
 Status HdfsRCFileScanner::ReadRowGroup() {
-  ResetRowGroup();
+  RETURN_IF_ERROR(ResetRowGroup());
 
   while (num_rows_ == 0) {
     RETURN_IF_ERROR(ReadRowGroupHeader());
@@ -453,7 +454,7 @@ Status HdfsRCFileScanner::ReadColumnBuffers() {
 }
 
 Status HdfsRCFileScanner::ProcessRange() {
-  ResetRowGroup();
+  RETURN_IF_ERROR(ResetRowGroup());
 
   // HdfsRCFileScanner effectively does buffered IO, in that it reads all the
   // materialized columns into a row group buffer.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e18755e/be/src/exec/hdfs-rcfile-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.h 
b/be/src/exec/hdfs-rcfile-scanner.h
index 4e18bc4..0330979 100644
--- a/be/src/exec/hdfs-rcfile-scanner.h
+++ b/be/src/exec/hdfs-rcfile-scanner.h
@@ -320,8 +320,8 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
   ///   ReadColumnBuffers
   Status ReadRowGroup();
 
-  /// Reset state for a new row group
-  void ResetRowGroup();
+  /// Reset state for a new row group. Can fail if allocating the next row 
batch fails.
+  Status ResetRowGroup();
 
   /// Move to next row. Calls NextField on each column that we are reading.
   /// Modifies:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e18755e/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 71efd5a..3ca2744 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -327,16 +327,19 @@ class HdfsScanner {
   /// Only valid to call if the parent scan node is multi-threaded.
   Status CommitRows(int num_rows);
 
-  /// Release all memory in 'pool' to batch_. If commit_batch is true, the row 
batch
-  /// will be committed. commit_batch should be true if the attached pool is 
expected
+  /// Release all memory in 'pool' to batch_. If 'commit_batch' is true, the 
row batch
+  /// will be committed. 'commit_batch' should be true if the attached pool is 
expected
   /// to be non-trivial (i.e. a decompression buffer) to minimize scanner mem 
usage.
-  /// Only valid to call if the parent scan node is multi-threaded.
-  void AttachPool(MemPool* pool, bool commit_batch) {
+  /// Can return an error status if 'commit_batch' is true and allocating the 
next
+  /// batch fails, or if the query hit an error or is cancelled. Only valid to 
call if
+  /// the parent scan node is multi-threaded.
+  Status AttachPool(MemPool* pool, bool commit_batch) {
     DCHECK(scan_node_->HasRowBatchQueue());
     DCHECK(batch_ != NULL);
     DCHECK(pool != NULL);
     batch_->tuple_data_pool()->AcquireData(pool, false);
-    if (commit_batch) CommitRows(0);
+    if (commit_batch) RETURN_IF_ERROR(CommitRows(0));
+    return Status::OK();
   }
 
   /// Convenience function for evaluating conjuncts using this scanner's 
ExprContexts.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e18755e/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc 
b/be/src/exec/hdfs-sequence-scanner.cc
index 33be362..c9f4319 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -456,7 +456,7 @@ Status HdfsSequenceScanner::ReadCompressedBlock() {
   // We are reading a new compressed block.  Pass the previous buffer pool
   // bytes to the batch.  We don't need them anymore.
   if (!decompressor_->reuse_output_buffer()) {
-    AttachPool(data_buffer_pool_.get(), true);
+    RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), true));
   }
 
   RETURN_IF_FALSE(stream_->ReadVLong(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e18755e/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 0b048f4..fa267b4 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -517,7 +517,7 @@ Status 
HdfsTextScanner::FillByteBufferCompressedStream(bool* eosr) {
   // safe to attach the current decompression buffer to batch_ because we know 
that
   // it's the last row-batch that can possibly reference this buffer.
   if (!decompressor_->reuse_output_buffer()) {
-    AttachPool(data_buffer_pool_.get(), false);
+    RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), false));
   }
 
   uint8_t* decompressed_buffer = NULL;

Reply via email to