IMPALA-4391: fix dropped statuses in scanners As far as I'm aware we haven't seen any failures related to these in practice, but fixing them as a preventative measure.
Testing: I was unable to reproduce this easily by adding a failpoint. I suspect that stress testing of the affected file formats would have eventually found this, because of the similarity to IMPALA-3962. Change-Id: I7c47f8b29a20dc74ad9e9e1c58b3d7ed95de4d35 Reviewed-on: http://gerrit.cloudera.org:8080/4938 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3e18755e Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3e18755e Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3e18755e Branch: refs/heads/master Commit: 3e18755edaecfa475fa9d0babc3e71ab1016686a Parents: 832fb53 Author: Tim Armstrong <[email protected]> Authored: Thu Nov 3 07:54:59 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Sat Nov 5 04:43:55 2016 +0000 ---------------------------------------------------------------------- be/src/exec/base-sequence-scanner.cc | 2 +- be/src/exec/hdfs-avro-scanner.cc | 2 +- be/src/exec/hdfs-rcfile-scanner.cc | 9 +++++---- be/src/exec/hdfs-rcfile-scanner.h | 4 ++-- be/src/exec/hdfs-scanner.h | 13 ++++++++----- be/src/exec/hdfs-sequence-scanner.cc | 2 +- be/src/exec/hdfs-text-scanner.cc | 2 +- 7 files changed, 19 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e18755e/be/src/exec/base-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc index 5f6bad6..cc0722d 100644 --- a/be/src/exec/base-sequence-scanner.cc +++ b/be/src/exec/base-sequence-scanner.cc @@ -152,7 +152,7 @@ Status BaseSequenceScanner::ProcessSplit() { static_cast<HdfsScanNode*>(scan_node_)->SetFileMetadata( stream_->filename(), header_); HdfsFileDesc* desc = scan_node_->GetFileDesc(stream_->filename()); - scan_node_->AddDiskIoRanges(desc); + RETURN_IF_ERROR(scan_node_->AddDiskIoRanges(desc)); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e18755e/be/src/exec/hdfs-avro-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc index b2e948e..a073cef 100644 --- a/be/src/exec/hdfs-avro-scanner.cc +++ b/be/src/exec/hdfs-avro-scanner.cc @@ -562,7 +562,7 @@ Status HdfsAvroScanner::ProcessRange() { } if (decompressor_.get() != NULL && !decompressor_->reuse_output_buffer()) { - AttachPool(data_buffer_pool_.get(), true); + RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), true)); } RETURN_IF_ERROR(ReadSync()); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e18755e/be/src/exec/hdfs-rcfile-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc index 012a424..eb78fb9 100644 --- a/be/src/exec/hdfs-rcfile-scanner.cc +++ b/be/src/exec/hdfs-rcfile-scanner.cc @@ -231,7 +231,7 @@ BaseSequenceScanner::FileHeader* HdfsRCFileScanner::AllocateFileHeader() { return new RcFileHeader; } -void HdfsRCFileScanner::ResetRowGroup() { +Status HdfsRCFileScanner::ResetRowGroup() { num_rows_ = 0; row_pos_ = 0; key_length_ = 0; @@ -249,13 +249,14 @@ void HdfsRCFileScanner::ResetRowGroup() { // We are done with this row group, pass along external buffers if necessary. if (!reuse_row_group_buffer_) { - AttachPool(data_buffer_pool_.get(), true); + RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), true)); row_group_buffer_size_ = 0; } + return Status::OK(); } Status HdfsRCFileScanner::ReadRowGroup() { - ResetRowGroup(); + RETURN_IF_ERROR(ResetRowGroup()); while (num_rows_ == 0) { RETURN_IF_ERROR(ReadRowGroupHeader()); @@ -453,7 +454,7 @@ Status HdfsRCFileScanner::ReadColumnBuffers() { } Status HdfsRCFileScanner::ProcessRange() { - ResetRowGroup(); + RETURN_IF_ERROR(ResetRowGroup()); // HdfsRCFileScanner effectively does buffered IO, in that it reads all the // materialized columns into a row group buffer. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e18755e/be/src/exec/hdfs-rcfile-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-rcfile-scanner.h b/be/src/exec/hdfs-rcfile-scanner.h index 4e18bc4..0330979 100644 --- a/be/src/exec/hdfs-rcfile-scanner.h +++ b/be/src/exec/hdfs-rcfile-scanner.h @@ -320,8 +320,8 @@ class HdfsRCFileScanner : public BaseSequenceScanner { /// ReadColumnBuffers Status ReadRowGroup(); - /// Reset state for a new row group - void ResetRowGroup(); + /// Reset state for a new row group. Can fail if allocating the next row batch fails. + Status ResetRowGroup(); /// Move to next row. Calls NextField on each column that we are reading. /// Modifies: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e18755e/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index 71efd5a..3ca2744 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -327,16 +327,19 @@ class HdfsScanner { /// Only valid to call if the parent scan node is multi-threaded. Status CommitRows(int num_rows); - /// Release all memory in 'pool' to batch_. If commit_batch is true, the row batch - /// will be committed. commit_batch should be true if the attached pool is expected + /// Release all memory in 'pool' to batch_. If 'commit_batch' is true, the row batch + /// will be committed. 'commit_batch' should be true if the attached pool is expected /// to be non-trivial (i.e. a decompression buffer) to minimize scanner mem usage. - /// Only valid to call if the parent scan node is multi-threaded. - void AttachPool(MemPool* pool, bool commit_batch) { + /// Can return an error status if 'commit_batch' is true and allocating the next + /// batch fails, or if the query hit an error or is cancelled. Only valid to call if + /// the parent scan node is multi-threaded. + Status AttachPool(MemPool* pool, bool commit_batch) { DCHECK(scan_node_->HasRowBatchQueue()); DCHECK(batch_ != NULL); DCHECK(pool != NULL); batch_->tuple_data_pool()->AcquireData(pool, false); - if (commit_batch) CommitRows(0); + if (commit_batch) RETURN_IF_ERROR(CommitRows(0)); + return Status::OK(); } /// Convenience function for evaluating conjuncts using this scanner's ExprContexts. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e18755e/be/src/exec/hdfs-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc index 33be362..c9f4319 100644 --- a/be/src/exec/hdfs-sequence-scanner.cc +++ b/be/src/exec/hdfs-sequence-scanner.cc @@ -456,7 +456,7 @@ Status HdfsSequenceScanner::ReadCompressedBlock() { // We are reading a new compressed block. Pass the previous buffer pool // bytes to the batch. We don't need them anymore. if (!decompressor_->reuse_output_buffer()) { - AttachPool(data_buffer_pool_.get(), true); + RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), true)); } RETURN_IF_FALSE(stream_->ReadVLong( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3e18755e/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index 0b048f4..fa267b4 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -517,7 +517,7 @@ Status HdfsTextScanner::FillByteBufferCompressedStream(bool* eosr) { // safe to attach the current decompression buffer to batch_ because we know that // it's the last row-batch that can possibly reference this buffer. if (!decompressor_->reuse_output_buffer()) { - AttachPool(data_buffer_pool_.get(), false); + RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), false)); } uint8_t* decompressed_buffer = NULL;
