Repository: incubator-impala
Updated Branches:
  refs/heads/master ac1215fd3 -> 834365618


IMPALA-3905: Add HdfsScanner::GetNext() interface and implementation for 
Parquet.

This is a first step towards making our scan node single threaded since we are
moving to an execution model where multi-threading is done at the fragment 
level.

This patch adds a new synchronous HdfsScanner::GetNext() interface and 
implements
it for the Parquet scanner. The async execution via HdfsScanner::ProcessSplit()
is still supported and is implemented by repeatedly calling GetNext() for
code sharing purposes.

I did not yet add a single-threaded scan node that uses GetNext().
The new code will be excercised by the existing scan node and tests.

Testing: I ran an exhaustive private build which passed. I also
ran a microbenchmark on a big TPCH lineitem table and there was no
significant difference in scan performance.

Change-Id: Iab50770bac05afcda4d3404fb4f53a0104931eb0
Reviewed-on: http://gerrit.cloudera.org:8080/3801
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Alex Behm <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/83436561
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/83436561
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/83436561

Branch: refs/heads/master
Commit: 8343656189d631491b7372a60e228c851b61165c
Parents: ac1215f
Author: Alex Behm <[email protected]>
Authored: Mon Jul 11 21:54:21 2016 -0700
Committer: Alex Behm <[email protected]>
Committed: Sat Jul 30 23:52:27 2016 +0000

----------------------------------------------------------------------
 be/src/exec/base-sequence-scanner.cc |  23 +-
 be/src/exec/base-sequence-scanner.h  |   6 +-
 be/src/exec/hdfs-avro-scanner.cc     |   9 +-
 be/src/exec/hdfs-avro-scanner.h      |   4 +-
 be/src/exec/hdfs-parquet-scanner.cc  | 491 +++++++++++++++++-------------
 be/src/exec/hdfs-parquet-scanner.h   |  82 +++--
 be/src/exec/hdfs-rcfile-scanner.cc   |   9 +-
 be/src/exec/hdfs-rcfile-scanner.h    |   5 +-
 be/src/exec/hdfs-scan-node.cc        |  31 +-
 be/src/exec/hdfs-scan-node.h         |  24 +-
 be/src/exec/hdfs-scanner-ir.cc       |   1 +
 be/src/exec/hdfs-scanner.cc          |  22 +-
 be/src/exec/hdfs-scanner.h           |  78 +++--
 be/src/exec/hdfs-sequence-scanner.cc |   9 +-
 be/src/exec/hdfs-sequence-scanner.h  |  17 +-
 be/src/exec/hdfs-text-scanner.cc     |  27 +-
 be/src/exec/hdfs-text-scanner.h      |   7 +-
 be/src/exec/parquet-column-readers.h |  16 +-
 18 files changed, 501 insertions(+), 360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc 
b/be/src/exec/base-sequence-scanner.cc
index 268fdae..2af3ee7 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -63,8 +63,9 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNode* 
scan_node,
   return Status::OK();
 }
 
-BaseSequenceScanner::BaseSequenceScanner(HdfsScanNode* node, RuntimeState* 
state)
-  : HdfsScanner(node, state),
+BaseSequenceScanner::BaseSequenceScanner(HdfsScanNode* node, RuntimeState* 
state,
+    bool add_batches_to_queue)
+  : HdfsScanner(node, state, add_batches_to_queue),
     header_(NULL),
     block_start_(0),
     total_block_size_(0),
@@ -83,15 +84,17 @@ BaseSequenceScanner::BaseSequenceScanner()
 BaseSequenceScanner::~BaseSequenceScanner() {
 }
 
-Status BaseSequenceScanner::Prepare(ScannerContext* context) {
-  RETURN_IF_ERROR(HdfsScanner::Prepare(context));
+Status BaseSequenceScanner::Open(ScannerContext* context) {
+  RETURN_IF_ERROR(HdfsScanner::Open(context));
   stream_->set_read_past_size_cb(bind(&BaseSequenceScanner::ReadPastSize, 
this, _1));
   bytes_skipped_counter_ = ADD_COUNTER(
       scan_node_->runtime_profile(), "BytesSkipped", TUnit::BYTES);
+  // Allocate a new row batch. May fail if mem limit is exceeded.
+  RETURN_IF_ERROR(StartNewRowBatch());
   return Status::OK();
 }
 
-void BaseSequenceScanner::Close() {
+void BaseSequenceScanner::Close(RowBatch* row_batch) {
   VLOG_FILE << "Bytes read past scan range: " << -stream_->bytes_left();
   VLOG_FILE << "Average block size: "
             << (num_syncs_ > 1 ? total_block_size_ / (num_syncs_ - 1) : 0);
@@ -101,9 +104,10 @@ void BaseSequenceScanner::Close() {
     decompressor_->Close();
     decompressor_.reset(NULL);
   }
-  if (batch_ != NULL) {
-    AttachPool(data_buffer_pool_.get(), false);
-    AddFinalRowBatch();
+  if (row_batch != NULL) {
+    row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
+    context_->ReleaseCompletedResources(row_batch, true);
+    if (add_batches_to_queue_) scan_node_->AddMaterializedRowBatch(row_batch);
   }
   // Verify all resources (if any) have been transferred.
   DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0);
@@ -112,10 +116,11 @@ void BaseSequenceScanner::Close() {
   if (!only_parsing_header_ && header_ != NULL) {
     scan_node_->RangeComplete(file_format(), header_->compression_type);
   }
-  HdfsScanner::Close();
+  HdfsScanner::Close(row_batch);
 }
 
 Status BaseSequenceScanner::ProcessSplit() {
+  DCHECK(add_batches_to_queue_);
   header_ = reinterpret_cast<FileHeader*>(
       scan_node_->GetFileMetadata(stream_->filename()));
   if (header_ == NULL) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/base-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.h 
b/be/src/exec/base-sequence-scanner.h
index ea7ad78..7c02aef 100644
--- a/be/src/exec/base-sequence-scanner.h
+++ b/be/src/exec/base-sequence-scanner.h
@@ -38,8 +38,8 @@ class BaseSequenceScanner : public HdfsScanner {
   static Status IssueInitialRanges(HdfsScanNode* scan_node,
                                    const std::vector<HdfsFileDesc*>& files);
 
-  virtual Status Prepare(ScannerContext* context);
-  virtual void Close();
+  virtual Status Open(ScannerContext* context);
+  virtual void Close(RowBatch* row_batch);
   virtual Status ProcessSplit();
 
   virtual ~BaseSequenceScanner();
@@ -99,7 +99,7 @@ class BaseSequenceScanner : public HdfsScanner {
   /// Returns type of scanner: e.g. rcfile, seqfile
   virtual THdfsFileFormat::type file_format() const = 0;
 
-  BaseSequenceScanner(HdfsScanNode*, RuntimeState*);
+  BaseSequenceScanner(HdfsScanNode*, RuntimeState*, bool);
 
   /// Read and validate sync marker against header_->sync.  Returns non-ok if 
the sync
   /// marker did not match. Scanners should always use this function to read 
sync markers,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index 09d3955..39a204c 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -52,8 +52,9 @@ const string AVRO_MEM_LIMIT_EXCEEDED = "HdfsAvroScanner::$0() 
failed to allocate
 
 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
 
-HdfsAvroScanner::HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state)
-  : BaseSequenceScanner(scan_node, state),
+HdfsAvroScanner::HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state,
+    bool add_batches_to_queue)
+  : BaseSequenceScanner(scan_node, state, add_batches_to_queue),
     avro_header_(NULL),
     codegend_decode_avro_data_(NULL) {
 }
@@ -65,8 +66,8 @@ HdfsAvroScanner::HdfsAvroScanner()
   DCHECK(TestInfo::is_test());
 }
 
-Status HdfsAvroScanner::Prepare(ScannerContext* context) {
-  RETURN_IF_ERROR(BaseSequenceScanner::Prepare(context));
+Status HdfsAvroScanner::Open(ScannerContext* context) {
+  RETURN_IF_ERROR(BaseSequenceScanner::Open(context));
   if (scan_node_->avro_schema().schema == NULL) {
     return Status("Missing Avro schema in scan node. This could be due to 
stale "
         "metadata. Running 'invalidate metadata <tablename>' may resolve the 
problem.");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-avro-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index ac2ba40..1cd218c 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -86,9 +86,9 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   /// Avro file: {'O', 'b', 'j', 1}
   static const uint8_t AVRO_VERSION_HEADER[4];
 
-  HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state);
+  HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state, bool 
add_batches_to_queue);
 
-  virtual Status Prepare(ScannerContext* context);
+  virtual Status Open(ScannerContext* context);
 
   /// Codegen parsing records, writing tuples and evaluating predicates.
   static llvm::Function* Codegen(HdfsScanNode*,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index 6855fef..02f8a3e 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -132,8 +132,12 @@ DiskIoMgr::ScanRange* 
HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) {
 
 namespace impala {
 
-HdfsParquetScanner::HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* 
state)
-    : HdfsScanner(scan_node, state),
+HdfsParquetScanner::HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* 
state,
+    bool add_batches_to_queue)
+    : HdfsScanner(scan_node, state, add_batches_to_queue),
+      row_group_idx_(-1),
+      row_group_rows_read_(0),
+      advance_row_group_(true),
       scratch_batch_(new ScratchTupleBatch(
           scan_node->row_desc(), state_->batch_size(), 
scan_node->mem_tracker())),
       metadata_range_(NULL),
@@ -144,8 +148,9 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNode* 
scan_node, RuntimeState* st
   assemble_rows_timer_.Stop();
 }
 
-Status HdfsParquetScanner::Prepare(ScannerContext* context) {
-  RETURN_IF_ERROR(HdfsScanner::Prepare(context));
+Status HdfsParquetScanner::Open(ScannerContext* context) {
+  RETURN_IF_ERROR(HdfsScanner::Open(context));
+  stream_->set_contains_tuple_data(false);
   metadata_range_ = stream_->scan_range();
   num_cols_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT);
@@ -162,56 +167,74 @@ Status HdfsParquetScanner::Prepare(ScannerContext* 
context) {
     if (!ctx->filter->AlwaysTrue()) filter_ctxs_.push_back(ctx);
   }
   filter_stats_.resize(filter_ctxs_.size());
+
+  DCHECK(parse_status_.ok()) << "Invalid parse_status_" << 
parse_status_.GetDetail();
+
+  // First process the file metadata in the footer.
+  Status status = ProcessFooter();
+  // Release I/O buffers immediately to make sure they are cleaned up
+  // in case we return a non-OK status anywhere below.
+  context_->ReleaseCompletedResources(NULL, true);
+  RETURN_IF_ERROR(status);
+
+  // Parse the file schema into an internal representation for schema 
resolution.
+  schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(),
+      state_->query_options().parquet_fallback_schema_resolution));
+  RETURN_IF_ERROR(schema_resolver_->Init(&file_metadata_, filename()));
+
+  // We've processed the metadata and there are columns that need to be 
materialized.
+  RETURN_IF_ERROR(CreateColumnReaders(
+      *scan_node_->tuple_desc(), *schema_resolver_, &column_readers_));
+  COUNTER_SET(num_cols_counter_,
+      static_cast<int64_t>(CountScalarColumns(column_readers_)));
+  // Set top-level template tuple.
+  template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
+
+  // The scanner-wide stream was used only to read the file footer.  Each 
column has added
+  // its own stream.
+  stream_ = NULL;
   return Status::OK();
 }
 
-void HdfsParquetScanner::Close() {
-  vector<THdfsCompression::type> compression_types;
+void HdfsParquetScanner::Close(RowBatch* row_batch) {
+  if (row_batch != NULL) {
+    FlushRowGroupResources(row_batch);
+    if (add_batches_to_queue_) scan_node_->AddMaterializedRowBatch(row_batch);
+  }
+  // Verify all resources (if any) have been transferred.
+  DCHECK_EQ(dictionary_pool_.get()->total_allocated_bytes(), 0);
+  DCHECK_EQ(scratch_batch_->mem_pool()->total_allocated_bytes(), 0);
+  DCHECK_EQ(context_->num_completed_io_buffers(), 0);
 
-  // Visit each column reader, including collection reader children.
+  // Collect compression types for reporting completed ranges.
+  vector<THdfsCompression::type> compression_types;
   stack<ParquetColumnReader*> readers;
   for (ParquetColumnReader* r: column_readers_) readers.push(r);
   while (!readers.empty()) {
-    ParquetColumnReader* col_reader = readers.top();
+    ParquetColumnReader* reader = readers.top();
     readers.pop();
-
-    if (col_reader->IsCollectionReader()) {
-      CollectionColumnReader* collection_reader =
-          static_cast<CollectionColumnReader*>(col_reader);
-      for (ParquetColumnReader* r: *collection_reader->children()) 
readers.push(r);
+    if (reader->IsCollectionReader()) {
+      CollectionColumnReader* coll_reader = 
static_cast<CollectionColumnReader*>(reader);
+      for (ParquetColumnReader* r: *coll_reader->children()) readers.push(r);
       continue;
     }
-
-    BaseScalarColumnReader* scalar_reader =
-        static_cast<BaseScalarColumnReader*>(col_reader);
-    if (scalar_reader->decompressed_data_pool() != NULL) {
-      // No need to commit the row batches with the AttachPool() calls
-      // since AddFinalRowBatch() already does below.
-      AttachPool(scalar_reader->decompressed_data_pool(), false);
-    }
-    scalar_reader->Close();
+    BaseScalarColumnReader* scalar_reader = 
static_cast<BaseScalarColumnReader*>(reader);
     compression_types.push_back(scalar_reader->codec());
   }
-  if (batch_ != NULL) {
-    AttachPool(dictionary_pool_.get(), false);
-    AttachPool(scratch_batch_->mem_pool(), false);
-    AddFinalRowBatch();
-  }
-  // Verify all resources (if any) have been transferred.
-  DCHECK_EQ(dictionary_pool_.get()->total_allocated_bytes(), 0);
-  DCHECK_EQ(scratch_batch_->mem_pool()->total_allocated_bytes(), 0);
-  DCHECK_EQ(context_->num_completed_io_buffers(), 0);
+  assemble_rows_timer_.Stop();
+  assemble_rows_timer_.ReleaseCounter();
+
   // If this was a metadata only read (i.e. count(*)), there are no columns.
   if (compression_types.empty()) 
compression_types.push_back(THdfsCompression::NONE);
   scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types);
-  assemble_rows_timer_.Stop();
-  assemble_rows_timer_.ReleaseCounter();
 
   if (level_cache_pool_.get() != NULL) {
     level_cache_pool_->FreeAll();
-    level_cache_pool_.reset(NULL);
+    level_cache_pool_.reset();
   }
 
+  if (schema_resolver_.get() != NULL) schema_resolver_.reset();
+
   for (int i = 0; i < filter_ctxs_.size(); ++i) {
     const FilterStats* stats = filter_ctxs_[i]->stats;
     const LocalFilterStats& local = filter_stats_[i];
@@ -219,7 +242,7 @@ void HdfsParquetScanner::Close() {
         local.considered, local.rejected);
   }
 
-  HdfsScanner::Close();
+  HdfsScanner::Close(row_batch);
 }
 
 // Get the start of the column.
@@ -263,33 +286,102 @@ int HdfsParquetScanner::CountScalarColumns(const 
vector<ParquetColumnReader*>& c
 }
 
 Status HdfsParquetScanner::ProcessSplit() {
-  DCHECK(parse_status_.ok()) << "Invalid parse_status_" << 
parse_status_.GetDetail();
-  // First process the file metadata in the footer
-  bool eosr;
-  RETURN_IF_ERROR(ProcessFooter(&eosr));
-  if (eosr) return Status::OK();
+  DCHECK(add_batches_to_queue_);
+  bool scanner_eos = false;
+  do {
+    RETURN_IF_ERROR(StartNewRowBatch());
+    RETURN_IF_ERROR(GetNextInternal(batch_, &scanner_eos));
+    scan_node_->AddMaterializedRowBatch(batch_);
+  } while (!scanner_eos && !scan_node_->ReachedLimit());
+
+  // Transfer the remaining resources to this new batch in Close().
+  RETURN_IF_ERROR(StartNewRowBatch());
+  return Status::OK();
+}
 
-  // Parse the file schema into an internal representation for schema 
resolution.
-  ParquetSchemaResolver schema_resolver(*scan_node_->hdfs_table(),
-      state_->query_options().parquet_fallback_schema_resolution);
-  RETURN_IF_ERROR(schema_resolver.Init(&file_metadata_, filename()));
+Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch, bool* eos) {
+  if (scan_node_->IsZeroSlotTableScan()) {
+    // There are no materialized slots, e.g. count(*) over the table.  We can 
serve
+    // this query from just the file metadata. We don't need to read the 
column data.
+    if (row_group_rows_read_ == file_metadata_.num_rows) {
+      *eos = true;
+      return Status::OK();
+    }
+    assemble_rows_timer_.Start();
+    int rows_remaining = file_metadata_.num_rows - row_group_rows_read_;
+    int max_tuples = min(row_batch->capacity(), rows_remaining);
+    TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
+    int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples);
+    Status status = CommitRows(row_batch, num_to_commit);
+    assemble_rows_timer_.Stop();
+    RETURN_IF_ERROR(status);
+    row_group_rows_read_ += num_to_commit;
+    COUNTER_ADD(scan_node_->rows_read_counter(), row_group_rows_read_);
+    return Status::OK();
+  }
 
-  // We've processed the metadata and there are columns that need to be 
materialized.
-  RETURN_IF_ERROR(
-      CreateColumnReaders(*scan_node_->tuple_desc(), schema_resolver, 
&column_readers_));
-  COUNTER_SET(num_cols_counter_,
-      static_cast<int64_t>(CountScalarColumns(column_readers_)));
-  // Set top-level template tuple.
-  template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
+  // Transfer remaining tuples from the scratch batch.
+  if (!scratch_batch_->AtEnd()) {
+    assemble_rows_timer_.Start();
+    int num_row_to_commit = TransferScratchTuples(row_batch);
+    assemble_rows_timer_.Stop();
+    RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit));
+    if (row_batch->AtCapacity()) return Status::OK();
+  }
 
-  // The scanner-wide stream was used only to read the file footer.  Each 
column has added
-  // its own stream.
-  stream_ = NULL;
+  while (advance_row_group_ || column_readers_[0]->RowGroupAtEnd()) {
+    if (!advance_row_group_) {
+      // End of the previous row group. Transfer resources and clear streams 
because
+      // we will create new streams for the next row group.
+      FlushRowGroupResources(row_batch);
+      context_->ClearStreams();
+      Status status =
+          ValidateEndOfRowGroup(column_readers_, row_group_idx_, 
row_group_rows_read_);
+      if (!status.ok()) 
RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+    }
+    RETURN_IF_ERROR(NextRowGroup());
+    if (row_group_idx_ >= file_metadata_.row_groups.size()) {
+      *eos = true;
+      DCHECK(parse_status_.ok());
+      return Status::OK();
+    }
+  }
+
+  // Apply any runtime filters to static tuples containing the partition keys 
for this
+  // partition. If any filter fails, we return immediately and stop processing 
this
+  // scan range.
+  if 
(!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
+      FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
+    *eos = true;
+    DCHECK(parse_status_.ok());
+    return Status::OK();
+  }
+  assemble_rows_timer_.Start();
+  Status status = AssembleRows(column_readers_, row_batch, 
&advance_row_group_);
+  assemble_rows_timer_.Stop();
+  RETURN_IF_ERROR(status);
+  if (!parse_status_.ok()) {
+    RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
+    parse_status_ = Status::OK();
+  }
+
+  return Status::OK();
+}
+
+Status HdfsParquetScanner::NextRowGroup() {
+  advance_row_group_ = false;
+  row_group_rows_read_ = 0;
 
-  // Iterate through each row group in the file and process any row groups 
that fall
-  // within this split.
-  for (int i = 0; i < file_metadata_.row_groups.size(); ++i) {
-    const parquet::RowGroup& row_group = file_metadata_.row_groups[i];
+  // Loop until we have found a non-empty row group, and successfully 
initialized and
+  // seeded the column readers. Return a non-OK status from within loop only 
if the error
+  // is non-recoverable, otherwise log the error and continue with the next 
row group.
+  while (true) {
+    // Reset the parse status for the next row group.
+    parse_status_ = Status::OK();
+
+    ++row_group_idx_;
+    if (row_group_idx_ >= file_metadata_.row_groups.size()) break;
+    const parquet::RowGroup& row_group = 
file_metadata_.row_groups[row_group_idx_];
     if (row_group.num_rows == 0) continue;
 
     const DiskIoMgr::ScanRange* split_range = 
reinterpret_cast<ScanRangeMetadata*>(
@@ -302,23 +394,16 @@ Status HdfsParquetScanner::ProcessSplit() {
     int64_t split_offset = split_range->offset();
     int64_t split_length = split_range->len();
     if (!(row_group_mid_pos >= split_offset &&
-        row_group_mid_pos < split_offset + split_length)) continue;
+        row_group_mid_pos < split_offset + split_length)) {
+      // A row group is processed by the scanner whose split overlaps with the 
row
+      // group's mid point. This row group will be handled by a different 
scanner.
+      continue;
+    }
     COUNTER_ADD(num_row_groups_counter_, 1);
 
-    // Attach any resources and clear the streams before starting a new row 
group. These
-    // streams could either be just the footer stream or streams for the 
previous row
-    // group.
-    context_->ReleaseCompletedResources(batch_, /* done */ true);
-    context_->ClearStreams();
-    // Commit the rows to flush the row batch from the previous row group
-    CommitRows(0);
-
-    RETURN_IF_ERROR(InitColumns(i, column_readers_));
-
-    assemble_rows_timer_.Start();
-
     // Prepare column readers for first read
-    bool continue_execution = true;
+    RETURN_IF_ERROR(InitColumns(row_group_idx_, column_readers_));
+    bool seeding_ok = true;
     for (ParquetColumnReader* col_reader: column_readers_) {
       // Seed collection and boolean column readers with NextLevel().
       // The ScalarColumnReaders use an optimized ReadValueBatch() that
@@ -328,48 +413,127 @@ Status HdfsParquetScanner::ProcessSplit() {
       // will allow better sharing of code between the row-wise and column-wise
       // materialization strategies.
       if (col_reader->NeedsSeedingForBatchedReading()) {
-        continue_execution = col_reader->NextLevels();
+        if (!col_reader->NextLevels()) {
+          seeding_ok = false;
+          break;
+        }
       }
-      if (!continue_execution) break;
       DCHECK(parse_status_.ok()) << "Invalid parse_status_" << 
parse_status_.GetDetail();
     }
 
-    bool filters_pass = true;
-    if (continue_execution) {
-      continue_execution = AssembleRows(column_readers_, i, &filters_pass);
-      assemble_rows_timer_.Stop();
+    if (!parse_status_.ok()) {
+      RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
+    } else if (seeding_ok) {
+      // Found a non-empty row group and successfully initialized the column 
readers.
+      break;
     }
+  }
 
-    // Check the query_status_ before logging the parse_status_. query_status_ 
is merged
-    // with parse_status_ in AssembleRows(). It's misleading to log 
query_status_ as parse
-    // error because it is shared by all threads in the same fragment instance 
and it's
-    // unclear which threads caused the error.
-    //
-    // TODO: It's a really bad idea to propagate UDF error via the global 
RuntimeState.
-    // Store UDF error in thread local storage or make UDF return status so it 
can merge
-    // with parse_status_.
-    RETURN_IF_ERROR(state_->GetQueryStatus());
-    if (UNLIKELY(!parse_status_.ok())) {
-      RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
+  DCHECK(parse_status_.ok());
+  return Status::OK();
+}
+
+void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) {
+  DCHECK(row_batch != NULL);
+  row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
+  row_batch->tuple_data_pool()->AcquireData(scratch_batch_->mem_pool(), false);
+  context_->ReleaseCompletedResources(row_batch, true);
+  for (ParquetColumnReader* col_reader: column_readers_) {
+    col_reader->Close(row_batch);
+  }
+}
+
+/// High-level steps of this function:
+/// 1. Allocate 'scratch' memory for tuples able to hold a full batch
+/// 2. Populate the slots of all scratch tuples one column reader at a time,
+///    using the ColumnReader::Read*ValueBatch() functions.
+/// 3. Evaluate runtime filters and conjuncts against the scratch tuples and
+///    set the surviving tuples in the output batch. Transfer the ownership of
+///    scratch memory to the output batch once the scratch memory is exhausted.
+/// 4. Repeat steps above until we are done with the row group or an error
+///    occurred.
+/// TODO: Since the scratch batch is populated in a column-wise fashion, it is
+/// difficult to maintain a maximum memory footprint without throwing away at 
least
+/// some work. This point needs further experimentation and thought.
+Status HdfsParquetScanner::AssembleRows(
+    const vector<ParquetColumnReader*>& column_readers, RowBatch* row_batch,
+    bool* skip_row_group) {
+  DCHECK(!column_readers.empty());
+  DCHECK(row_batch != NULL);
+  DCHECK_EQ(*skip_row_group, false);
+  DCHECK(scratch_batch_ != NULL);
+
+  while (!column_readers[0]->RowGroupAtEnd()) {
+    // Start a new scratch batch.
+    RETURN_IF_ERROR(scratch_batch_->Reset(state_));
+    int scratch_capacity = scratch_batch_->capacity();
+
+    // Initialize tuple memory.
+    for (int i = 0; i < scratch_capacity; ++i) {
+      InitTuple(template_tuple_, scratch_batch_->GetTuple(i));
     }
-    if (scan_node_->ReachedLimit()) return Status::OK();
-    if (context_->cancelled()) return Status::OK();
-    if (!filters_pass) return Status::OK();
-
-    DCHECK(continue_execution || !state_->abort_on_error());
-    // We should be at the end of the row group if we get this far with no 
parse error
-    if (parse_status_.ok()) DCHECK(column_readers_[0]->RowGroupAtEnd());
-    // Reset parse_status_ for the next row group.
-    parse_status_ = Status::OK();
+
+    // Materialize the top-level slots into the scratch batch column-by-column.
+    int last_num_tuples = -1;
+    int num_col_readers = column_readers.size();
+    bool continue_execution = true;
+    for (int c = 0; c < num_col_readers; ++c) {
+      ParquetColumnReader* col_reader = column_readers[c];
+      if (col_reader->max_rep_level() > 0) {
+        continue_execution = col_reader->ReadValueBatch(
+            scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_,
+            scratch_batch_->tuple_mem, &scratch_batch_->num_tuples);
+      } else {
+        continue_execution = col_reader->ReadNonRepeatedValueBatch(
+            scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_,
+            scratch_batch_->tuple_mem, &scratch_batch_->num_tuples);
+      }
+      if (UNLIKELY(!continue_execution)) {
+        *skip_row_group = true;
+        return Status::OK();
+      }
+      // Check that all column readers populated the same number of values.
+      if (c != 0) DCHECK_EQ(last_num_tuples, scratch_batch_->num_tuples);
+      last_num_tuples = scratch_batch_->num_tuples;
+    }
+    row_group_rows_read_ += scratch_batch_->num_tuples;
+    COUNTER_ADD(scan_node_->rows_read_counter(), scratch_batch_->num_tuples);
+
+    int num_row_to_commit = TransferScratchTuples(row_batch);
+    RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit));
+    if (row_batch->AtCapacity()) return Status::OK();
+  }
+
+  return Status::OK();
+}
+
+Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) {
+  DCHECK(dst_batch != NULL);
+  dst_batch->CommitRows(num_rows);
+
+  // We need to pass the row batch to the scan node if there is too much 
memory attached,
+  // which can happen if the query is very selective. We need to release 
memory even
+  // if no rows passed predicates.
+  if (dst_batch->AtCapacity() || context_->num_completed_io_buffers() > 0) {
+    context_->ReleaseCompletedResources(dst_batch, /* done */ false);
+  }
+  if (context_->cancelled()) return Status::CANCELLED;
+  // TODO: It's a really bad idea to propagate UDF error via the global 
RuntimeState.
+  // Store UDF error in thread local storage or make UDF return status so it 
can merge
+  // with parse_status_.
+  RETURN_IF_ERROR(state_->GetQueryStatus());
+  // Free local expr allocations for this thread
+  for (const auto& kv: scanner_conjuncts_map_) {
+    ExprContext::FreeLocalAllocations(kv.second);
   }
   return Status::OK();
 }
 
-int HdfsParquetScanner::TransferScratchTuples() {
+int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
   // This function must not be called when the output batch is already full. 
As long as
   // we always call CommitRows() after TransferScratchTuples(), the output 
batch can
   // never be empty.
-  DCHECK_LT(batch_->num_rows(), batch_->capacity());
+  DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity());
 
   const bool has_filters = !filter_ctxs_.empty();
   const bool has_conjuncts = !scanner_conjunct_ctxs_->empty();
@@ -378,10 +542,11 @@ int HdfsParquetScanner::TransferScratchTuples() {
 
   // Start/end/current iterators over the output rows.
   DCHECK_EQ(scan_node_->tuple_idx(), 0);
-  DCHECK_EQ(batch_->row_desc().tuple_descriptors().size(), 1);
+  DCHECK_EQ(dst_batch->row_desc().tuple_descriptors().size(), 1);
   Tuple** output_row_start =
-      reinterpret_cast<Tuple**>(batch_->GetRow(batch_->num_rows()));
-  Tuple** output_row_end = output_row_start + (batch_->capacity() - 
batch_->num_rows());
+      reinterpret_cast<Tuple**>(dst_batch->GetRow(dst_batch->num_rows()));
+  Tuple** output_row_end =
+      output_row_start + (dst_batch->capacity() - dst_batch->num_rows());
   Tuple** output_row = output_row_start;
 
   // Start/end/current iterators over the scratch tuples.
@@ -397,7 +562,7 @@ int HdfsParquetScanner::TransferScratchTuples() {
     DCHECK(!has_filters);
     DCHECK(!has_conjuncts);
     DCHECK_EQ(scratch_batch_->mem_pool()->total_allocated_bytes(), 0);
-    int num_tuples = min(batch_->capacity() - batch_->num_rows(),
+    int num_tuples = min(dst_batch->capacity() - dst_batch->num_rows(),
         scratch_batch_->num_tuples - scratch_batch_->tuple_idx);
     memset(output_row, 0, num_tuples * sizeof(Tuple*));
     scratch_batch_->tuple_idx += num_tuples;
@@ -430,7 +595,7 @@ int HdfsParquetScanner::TransferScratchTuples() {
   // quickly accumulate memory in the output batch, hit the memory capacity 
limit,
   // and return an output batch with relatively few rows.
   if (scratch_batch_->AtEnd()) {
-    batch_->tuple_data_pool()->AcquireData(scratch_batch_->mem_pool(), false);
+    dst_batch->tuple_data_pool()->AcquireData(scratch_batch_->mem_pool(), 
false);
   }
   return output_row - output_row_start;
 }
@@ -465,90 +630,6 @@ bool HdfsParquetScanner::EvalRuntimeFilters(TupleRow* row) 
{
   return true;
 }
 
-/// High-level steps of this function:
-/// 1. Allocate 'scratch' memory for tuples able to hold a full batch
-/// 2. Populate the slots of all scratch tuples one column reader at a time,
-///    using the ColumnReader::Read*ValueBatch() functions.
-/// 3. Evaluate runtime filters and conjuncts against the scratch tuples and
-///    set the surviving tuples in the output batch. Transfer the ownership of
-///    scratch memory to the output batch once the scratch memory is exhausted.
-/// 4. Repeat steps above until we are done with the row group or an error
-///    occurred.
-/// TODO: Since the scratch batch is populated in a column-wise fashion, it is
-/// difficult to maintain a maximum memory footprint without throwing away at 
least
-/// some work. This point needs further experimentation and thought.
-bool HdfsParquetScanner::AssembleRows(
-    const vector<ParquetColumnReader*>& column_readers, int row_group_idx, 
bool* filters_pass) {
-  DCHECK(!column_readers.empty());
-  DCHECK(scratch_batch_ != NULL);
-
-  int64_t rows_read = 0;
-  bool continue_execution = !scan_node_->ReachedLimit() && 
!context_->cancelled();
-  while (!column_readers[0]->RowGroupAtEnd()) {
-    if (UNLIKELY(!continue_execution)) break;
-
-    // Apply any runtime filters to static tuples containing the partition 
keys for this
-    // partition. If any filter fails, we return immediately and stop 
processing this
-    // row group.
-    if (!scan_node_->PartitionPassesFilterPredicates(
-        context_->partition_descriptor()->id(),
-        FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
-      *filters_pass = false;
-      return false;
-    }
-
-    // Start a new scratch batch.
-    parse_status_.MergeStatus(scratch_batch_->Reset(state_));
-    if (UNLIKELY(!parse_status_.ok())) return false;
-    int scratch_capacity = scratch_batch_->capacity();
-
-    // Initialize tuple memory.
-    for (int i = 0; i < scratch_capacity; ++i) {
-      InitTuple(template_tuple_, scratch_batch_->GetTuple(i));
-    }
-
-    // Materialize the top-level slots into the scratch batch column-by-column.
-    int last_num_tuples = -1;
-    int num_col_readers = column_readers.size();
-    for (int c = 0; c < num_col_readers; ++c) {
-      ParquetColumnReader* col_reader = column_readers[c];
-      if (col_reader->max_rep_level() > 0) {
-        continue_execution = col_reader->ReadValueBatch(
-            scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_,
-            scratch_batch_->tuple_mem, &scratch_batch_->num_tuples);
-      } else {
-        continue_execution = col_reader->ReadNonRepeatedValueBatch(
-            scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_,
-            scratch_batch_->tuple_mem, &scratch_batch_->num_tuples);
-      }
-      if (UNLIKELY(!continue_execution)) return false;
-      // Check that all column readers populated the same number of values.
-      if (c != 0) DCHECK_EQ(last_num_tuples, scratch_batch_->num_tuples);
-      last_num_tuples = scratch_batch_->num_tuples;
-    }
-
-    // Keep transferring scratch tuples to output batches until the scratch 
batch
-    // is empty. CommitRows() creates new output batches as necessary.
-    do {
-      int num_row_to_commit = TransferScratchTuples();
-      parse_status_.MergeStatus(CommitRows(num_row_to_commit));
-      if (UNLIKELY(!parse_status_.ok())) return false;
-    } while (!scratch_batch_->AtEnd());
-
-    rows_read += scratch_batch_->num_tuples;
-    COUNTER_ADD(scan_node_->rows_read_counter(), scratch_batch_->num_tuples);
-    continue_execution &= parse_status_.ok();
-    continue_execution &= !scan_node_->ReachedLimit() && 
!context_->cancelled();
-  }
-
-  if (column_readers[0]->RowGroupAtEnd() && parse_status_.ok()) {
-    parse_status_ = ValidateEndOfRowGroup(column_readers, row_group_idx, 
rows_read);
-    continue_execution &= parse_status_.ok();
-  }
-
-  return continue_execution;
-}
-
 bool HdfsParquetScanner::AssembleCollection(
     const vector<ParquetColumnReader*>& column_readers, int 
new_collection_rep_level,
     CollectionValueBuilder* coll_value_builder) {
@@ -654,8 +735,7 @@ inline bool HdfsParquetScanner::ReadCollectionItem(
   return continue_execution;
 }
 
-Status HdfsParquetScanner::ProcessFooter(bool* eosr) {
-  *eosr = false;
+Status HdfsParquetScanner::ProcessFooter() {
   int64_t len = stream_->scan_range()->len();
 
   // We're processing the scan range issued in IssueInitialRanges(). The scan 
range should
@@ -769,33 +849,6 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) {
   if (file_metadata_.__isset.created_by) {
     file_version_ = ParquetFileVersion(file_metadata_.created_by);
   }
-
-  if (scan_node_->IsZeroSlotTableScan()) {
-    // There are no materialized slots, e.g. count(*) over the table.  We can 
serve
-    // this query from just the file metadata.  We don't need to read the 
column data.
-    int64_t num_tuples = file_metadata_.num_rows;
-    COUNTER_ADD(scan_node_->rows_read_counter(), num_tuples);
-
-    while (num_tuples > 0) {
-      MemPool* pool;
-      Tuple* tuple;
-      TupleRow* current_row;
-      int max_tuples = GetMemory(&pool, &tuple, &current_row);
-      max_tuples = min<int64_t>(max_tuples, num_tuples);
-      num_tuples -= max_tuples;
-
-      int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples);
-      RETURN_IF_ERROR(CommitRows(num_to_commit));
-    }
-
-    *eosr = true;
-    return Status::OK();
-  } else if (file_metadata_.num_rows == 0) {
-    // Empty file
-    *eosr = true;
-    return Status::OK();
-  }
-
   if (file_metadata_.row_groups.empty()) {
     return Status(
         Substitute("Invalid file. This file: $0 has no row groups", 
filename()));
@@ -1075,7 +1128,7 @@ Status HdfsParquetScanner::ValidateEndOfRowGroup(
     // column has more values than stated in the metadata, meaning the final 
data page
     // will still have unread values.
     if (reader->num_buffered_values_ != 0) {
-      return Status(Substitute("Corrupt parquet metadata in file '$0': 
metadata reports "
+      return Status(Substitute("Corrupt Parquet metadata in file '$0': 
metadata reports "
           "'$1' more values in data page than actually present", filename(),
           reader->num_buffered_values_));
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h 
b/be/src/exec/hdfs-parquet-scanner.h
index 3791ae1..830701d 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -317,18 +317,19 @@ class BoolColumnReader;
 /// the ScannerContext.
 class HdfsParquetScanner : public HdfsScanner {
  public:
-  HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state);
-
+  HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state,
+      bool add_batches_to_queue);
   virtual ~HdfsParquetScanner() {};
-  virtual Status Prepare(ScannerContext* context);
-  virtual void Close();
-  virtual Status ProcessSplit();
 
   /// Issue just the footer range for each file.  We'll then parse the footer 
and pick
   /// out the columns we want.
   static Status IssueInitialRanges(HdfsScanNode* scan_node,
                                    const std::vector<HdfsFileDesc*>& files);
 
+  virtual Status Open(ScannerContext* context);
+  virtual Status ProcessSplit();
+  virtual void Close(RowBatch* row_batch);
+
   /// The repetition level is set to this value to indicate the end of a row 
group.
   static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min();
   /// Indicates an invalid definition or repetition level.
@@ -347,6 +348,20 @@ class HdfsParquetScanner : public HdfsScanner {
   /// need to issue another read.
   static const int64_t FOOTER_SIZE = 1024 * 100;
 
+  /// Index of the current row group being processed. Initialized to -1 which 
indicates
+  /// that we have not started processing the first row group yet (GetNext() 
has not yet
+  /// been called).
+  int32_t row_group_idx_;
+
+  /// Counts the number of rows processed for the current row group.
+  int64_t row_group_rows_read_;
+
+  /// Indicates whether we should advance to the next row group in the next 
GetNext().
+  /// Starts out as true to move to the very first row group.
+  bool advance_row_group_;
+
+  boost::scoped_ptr<ParquetSchemaResolver> schema_resolver_;
+
   /// Cached runtime filter contexts, one for each filter that applies to this 
column.
   vector<const FilterContext*> filter_ctxs_;
 
@@ -411,32 +426,35 @@ class HdfsParquetScanner : public HdfsScanner {
 
   const char* filename() const { return metadata_range_->file(); }
 
-  /// Reads data using 'column_readers' to materialize top-level tuples.
-  ///
-  /// Returns true when the row group is complete and execution can be safely 
resumed.
-  /// Returns false if execution should be aborted due to:
-  /// - parse_error_ is set
-  /// - query is cancelled
-  /// - scan node limit was reached
-  /// - the scanned file can be skipped based on runtime filters
-  /// When false is returned the column_readers are left in an undefined state 
and
-  /// execution should be aborted immediately by the caller.
-  ///
-  /// 'row_group_idx' is used for calling into ValidateEndOfRowGroup() when 
the end
-  /// of the row group is reached.
-  ///
-  /// If 'filters_pass' is set to false by this method, the partition columns 
associated
-  /// with this row group did not pass all the runtime filters (and therefore 
only filter
-  /// contexts that apply only to partition columns are checked).
-  bool AssembleRows(const std::vector<ParquetColumnReader*>& column_readers,
-      int row_group_idx, bool* filters_pass);
+  virtual Status GetNextInternal(RowBatch* row_batch, bool* eos);
+
+  /// Advances 'row_group_idx_' to the next non-empty row group and initializes
+  /// the column readers to scan it. Recoverable errors are logged to the 
runtime
+  /// state. Only returns a non-OK status if a non-recoverable error is 
encountered
+  /// (or abort_on_error is true). If OK is returned, 'parse_status_' is 
guaranteed
+  /// to be OK as well.
+  Status NextRowGroup();
+
+  /// Reads data using 'column_readers' to materialize top-level tuples into 
'row_batch'.
+  /// Returns a non-OK status if a non-recoverable error was encountered and 
execution
+  /// of this query should be terminated immediately.
+  /// May set *skip_row_group to indicate that the current row group should be 
skipped,
+  /// e.g., due to a parse error, but execution should continue.
+  Status AssembleRows(const std::vector<ParquetColumnReader*>& column_readers,
+      RowBatch* row_batch, bool* skip_row_group);
+
+  /// Commit num_rows to the given row batch.
+  /// Returns OK if the query is not cancelled and hasn't exceeded any mem 
limits.
+  /// Scanner can call this with 0 rows to flush any pending resources 
(attached pools
+  /// and io buffers) to minimize memory consumption.
+  Status CommitRows(RowBatch* dst_batch, int num_rows);
 
   /// Evaluates runtime filters and conjuncts (if any) against the tuples in
-  /// 'scratch_batch_', and adds the surviving tuples to the output batch.
-  /// Transfers the ownership of tuple memory to the output batch when the
+  /// 'scratch_batch_', and adds the surviving tuples to the given batch.
+  /// Transfers the ownership of tuple memory to the target batch when the
   /// scratch batch is exhausted.
-  /// Returns the number of rows that should be committed to the output batch.
-  int TransferScratchTuples();
+  /// Returns the number of rows that should be committed to the given batch.
+  int TransferScratchTuples(RowBatch* dst_batch);
 
   /// Evaluates runtime filters (if any) against the given row. Returns true if
   /// they passed, false otherwise. Maintains the runtime filter stats, 
determines
@@ -474,8 +492,7 @@ class HdfsParquetScanner : public HdfsScanner {
 
   /// Process the file footer and parse file_metadata_.  This should be called 
with the
   /// last FOOTER_SIZE bytes in context_.
-  /// *eosr is a return value.  If true, the scan range is complete (e.g. 
select count(*))
-  Status ProcessFooter(bool* eosr);
+  Status ProcessFooter();
 
   /// Populates 'column_readers' for the slots in 'tuple_desc', including 
creating child
   /// readers for any collections. Schema resolution is handled in this 
function as
@@ -516,6 +533,11 @@ class HdfsParquetScanner : public HdfsScanner {
 
   /// Part of the HdfsScanner interface, not used in Parquet.
   Status InitNewRange() { return Status::OK(); };
+
+  /// Transfers the remaining resources backing tuples such as IO buffers and 
memory
+  /// from mem pools to the given row batch. Closes all column readers.
+  /// Should be called after completing a row group and when returning the 
last batch.
+  void FlushRowGroupResources(RowBatch* row_batch);
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc 
b/be/src/exec/hdfs-rcfile-scanner.cc
index 3914845..65fe502 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -50,15 +50,16 @@ const uint8_t HdfsRCFileScanner::RCFILE_VERSION_HEADER[4] = 
{'R', 'C', 'F', 1};
 // Macro to convert between SerdeUtil errors to Status returns.
 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
 
-HdfsRCFileScanner::HdfsRCFileScanner(HdfsScanNode* scan_node, RuntimeState* 
state)
-    : BaseSequenceScanner(scan_node, state) {
+HdfsRCFileScanner::HdfsRCFileScanner(HdfsScanNode* scan_node, RuntimeState* 
state,
+    bool add_batches_to_queue)
+    : BaseSequenceScanner(scan_node, state, add_batches_to_queue) {
 }
 
 HdfsRCFileScanner::~HdfsRCFileScanner() {
 }
 
-Status HdfsRCFileScanner::Prepare(ScannerContext* context) {
-  RETURN_IF_ERROR(BaseSequenceScanner::Prepare(context));
+Status HdfsRCFileScanner::Open(ScannerContext* context) {
+  RETURN_IF_ERROR(BaseSequenceScanner::Open(context));
   text_converter_.reset(
       new TextConverter(0, scan_node_->hdfs_table()->null_column_value()));
   scan_node_->IncNumScannersCodegenDisabled();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-rcfile-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.h 
b/be/src/exec/hdfs-rcfile-scanner.h
index b86f797..314c870 100644
--- a/be/src/exec/hdfs-rcfile-scanner.h
+++ b/be/src/exec/hdfs-rcfile-scanner.h
@@ -230,10 +230,11 @@ class Tuple;
 /// A scanner for reading RCFiles into tuples.
 class HdfsRCFileScanner : public BaseSequenceScanner {
  public:
-  HdfsRCFileScanner(HdfsScanNode* scan_node, RuntimeState* state);
+  HdfsRCFileScanner(HdfsScanNode* scan_node, RuntimeState* state,
+      bool add_batches_to_queue);
   virtual ~HdfsRCFileScanner();
 
-  virtual Status Prepare(ScannerContext* context);
+  virtual Status Open(ScannerContext* context);
 
   void DebugString(int indentation_level, std::stringstream* out) const;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 9a89ac3..7904071 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -64,8 +64,6 @@ DEFINE_bool(suppress_unknown_disk_id_warnings, false,
     " provide volume/disk information.");
 DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, 
in ms, "
     "that a scan node will wait for expected runtime filters to arrive.");
-DECLARE_string(cgroup_hierarchy_path);
-DECLARE_bool(enable_rm);
 
 #ifndef NDEBUG
 DECLARE_bool(skip_file_runtime_filtering);
@@ -194,7 +192,7 @@ bool HdfsScanNode::FilePassesFilterPredicates(const 
vector<FilterContext>& filte
   if (filter_ctxs_.size() == 0) return true;
   ScanRangeMetadata* metadata =
       reinterpret_cast<ScanRangeMetadata*>(file->splits[0]->meta_data());
-  if (!PartitionPassesFilterPredicates(metadata->partition_id, 
FilterStats::FILES_KEY,
+  if (!PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY,
           filter_ctxs)) {
     for (int j = 0; j < file->splits.size(); ++j) {
       // Mark range as complete to ensure progress.
@@ -372,7 +370,7 @@ void* HdfsScanNode::GetCodegenFn(THdfsFileFormat::type 
type) {
   return it->second;
 }
 
-Status HdfsScanNode::CreateAndPrepareScanner(HdfsPartitionDescriptor* 
partition,
+Status HdfsScanNode::CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
     ScannerContext* context, scoped_ptr<HdfsScanner>* scanner) {
   DCHECK(context != NULL);
   THdfsCompression::type compression =
@@ -386,20 +384,20 @@ Status 
HdfsScanNode::CreateAndPrepareScanner(HdfsPartitionDescriptor* partition,
       if (compression == THdfsCompression::LZO) {
         scanner->reset(HdfsLzoTextScanner::GetHdfsLzoTextScanner(this, 
runtime_state_));
       } else {
-        scanner->reset(new HdfsTextScanner(this, runtime_state_));
+        scanner->reset(new HdfsTextScanner(this, runtime_state_, true));
       }
       break;
     case THdfsFileFormat::SEQUENCE_FILE:
-      scanner->reset(new HdfsSequenceScanner(this, runtime_state_));
+      scanner->reset(new HdfsSequenceScanner(this, runtime_state_, true));
       break;
     case THdfsFileFormat::RC_FILE:
-      scanner->reset(new HdfsRCFileScanner(this, runtime_state_));
+      scanner->reset(new HdfsRCFileScanner(this, runtime_state_, true));
       break;
     case THdfsFileFormat::AVRO:
-      scanner->reset(new HdfsAvroScanner(this, runtime_state_));
+      scanner->reset(new HdfsAvroScanner(this, runtime_state_, true));
       break;
     case THdfsFileFormat::PARQUET:
-      scanner->reset(new HdfsParquetScanner(this, runtime_state_));
+      scanner->reset(new HdfsParquetScanner(this, runtime_state_, true));
       break;
     default:
       return Status(Substitute("Unknown Hdfs file format type: $0",
@@ -408,8 +406,8 @@ Status 
HdfsScanNode::CreateAndPrepareScanner(HdfsPartitionDescriptor* partition,
   DCHECK(scanner->get() != NULL);
   Status status = ExecDebugAction(TExecNodePhase::PREPARE_SCANNER, 
runtime_state_);
   if (status.ok()) {
-    status = scanner->get()->Prepare(context);
-    if (!status.ok()) scanner->get()->Close();
+    status = scanner->get()->Open(context);
+    if (!status.ok()) scanner->get()->Close(scanner->get()->batch());
   } else {
     context->ClearStreams();
   }
@@ -1140,7 +1138,7 @@ void HdfsScanNode::ScannerThread() {
   runtime_state_->resource_pool()->ReleaseThreadToken(false);
 }
 
-bool HdfsScanNode::PartitionPassesFilterPredicates(int32_t partition_id,
+bool HdfsScanNode::PartitionPassesFilters(int32_t partition_id,
     const string& stats_name, const vector<FilterContext>& filter_ctxs) {
   if (filter_ctxs.size() == 0) return true;
   DCHECK_EQ(filter_ctxs.size(), filter_ctxs_.size())
@@ -1198,8 +1196,7 @@ Status HdfsScanNode::ProcessSplit(const 
vector<FilterContext>& filter_ctxs,
   // done. See FilePassesFilterPredicates() for the correct logic to mark all 
splits in a
   // file as done; the correct fix here is to do that for every file in a 
thread-safe way.
   if (!FileFormatIsSequenceBased(partition->file_format())) {
-    if (!PartitionPassesFilterPredicates(partition_id, FilterStats::SPLITS_KEY,
-            filter_ctxs)) {
+    if (!PartitionPassesFilters(partition_id, FilterStats::SPLITS_KEY, 
filter_ctxs)) {
       // Avoid leaking unread buffers in scan_range.
       scan_range->Cancel(Status::CANCELLED);
       // Mark scan range as done.
@@ -1211,7 +1208,7 @@ Status HdfsScanNode::ProcessSplit(const 
vector<FilterContext>& filter_ctxs,
 
   ScannerContext context(runtime_state_, this, partition, scan_range, 
filter_ctxs);
   scoped_ptr<HdfsScanner> scanner;
-  Status status = CreateAndPrepareScanner(partition, &context, &scanner);
+  Status status = CreateAndOpenScanner(partition, &context, &scanner);
   if (!status.ok()) {
     // If preparation fails, avoid leaking unread buffers in the scan_range.
     scan_range->Cancel(status);
@@ -1243,7 +1240,9 @@ Status HdfsScanNode::ProcessSplit(const 
vector<FilterContext>& filter_ctxs,
     VLOG_QUERY << ss.str();
   }
 
-  scanner->Close();
+  // Transfer the remaining resources to the final row batch (if any) and add 
it to
+  // the row batch queue.
+  scanner->Close(scanner->batch());
   return status;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index c611a45..13b07bd 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -152,7 +152,7 @@ class HdfsScanNode : public ScanNode {
   /// Returns number of partition key slots.
   int num_materialized_partition_keys() const { return 
partition_key_slots_.size(); }
 
-  const TupleDescriptor* tuple_desc() { return tuple_desc_; }
+  const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
 
   const HdfsTableDescriptor* hdfs_table() { return hdfs_table_; }
 
@@ -262,7 +262,9 @@ class HdfsScanNode : public ScanNode {
 
   /// Called by the scanner when a range is complete.  Used to trigger done_ 
and
   /// to log progress.  This *must* only be called after the scanner has 
completely
-  /// finished the scan range (i.e. context->Flush()).
+  /// finished the scan range (i.e. context->Flush()), and has added the final
+  /// row batch to the row batch queue. Otherwise the last batch may be
+  /// lost due to racing with shutting down the row batch queue.
   void RangeComplete(const THdfsFileFormat::type& file_type,
       const THdfsCompression::type& compression_type);
   /// Same as above except for when multiple compression codecs were used
@@ -279,7 +281,7 @@ class HdfsScanNode : public ScanNode {
   void ComputeSlotMaterializationOrder(std::vector<int>* order) const;
 
   /// Returns true if there are no materialized slots, such as a count(*) over 
the table.
-  inline bool IsZeroSlotTableScan() {
+  inline bool IsZeroSlotTableScan() const {
     return materialized_slots().empty() && tuple_desc()->tuple_path().empty();
   }
 
@@ -305,12 +307,12 @@ class HdfsScanNode : public ScanNode {
   ///
   /// 'filter_ctxs' is either an empty list, in which case filtering is 
disabled and the
   /// function returns true, or a set of filter contexts to evaluate.
-  bool PartitionPassesFilterPredicates(int32_t partition_id,
-      const std::string& stats_name, const std::vector<FilterContext>& 
filter_ctxs);
+  bool PartitionPassesFilters(int32_t partition_id, const std::string& 
stats_name,
+      const std::vector<FilterContext>& filter_ctxs);
 
   const std::vector<FilterContext> filter_ctxs() const { return filter_ctxs_; }
 
- private:
+ protected:
   friend class ScannerContext;
 
   RuntimeState* runtime_state_;
@@ -503,15 +505,13 @@ class HdfsScanNode : public ScanNode {
   /// -1 if no callback is registered.
   int32_t rm_callback_id_;
 
-  /// Called when scanner threads are available for this scan node. This will
-  /// try to spin up as many scanner threads as the quota allows.
-  /// This is also called whenever a new range is added to the IoMgr to 'pull'
-  /// thread tokens if they are available.
+  /// Tries to spin up as many scanner threads as the quota allows. Called 
explicitly
+  /// (e.g., when adding new ranges) or when threads are available for this 
scan node.
   void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool);
 
-  /// Create and prepare new scanner for this partition type.
+  /// Create and open new scanner for this partition type.
   /// If the scanner is successfully created, it is returned in 'scanner'.
-  Status CreateAndPrepareScanner(HdfsPartitionDescriptor* partition,
+  Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
       ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner);
 
   /// Main function for scanner thread. This thread pulls the next range to be

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc
index 26763e9..0c5d6f8 100644
--- a/be/src/exec/hdfs-scanner-ir.cc
+++ b/be/src/exec/hdfs-scanner-ir.cc
@@ -34,6 +34,7 @@ int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* 
tuple_row, int row_
     FieldLocation* fields, int num_tuples, int max_added_tuples,
     int slots_per_tuple, int row_idx_start) {
 
+  DCHECK(add_batches_to_queue_);
   DCHECK(tuple_ != NULL);
   uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(tuple_row);
   uint8_t* tuple_mem = reinterpret_cast<uint8_t*>(tuple_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index ea101c0..c02ad18 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -53,9 +53,11 @@ using namespace strings;
 const char* FieldLocation::LLVM_CLASS_NAME = "struct.impala::FieldLocation";
 const char* HdfsScanner::LLVM_CLASS_NAME = "class.impala::HdfsScanner";
 
-HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state)
+HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state,
+    bool add_batches_to_queue)
     : scan_node_(scan_node),
       state_(state),
+      add_batches_to_queue_(add_batches_to_queue),
       context_(NULL),
       stream_(NULL),
       scanner_conjunct_ctxs_(NULL),
@@ -75,6 +77,7 @@ HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, 
RuntimeState* state)
 HdfsScanner::HdfsScanner()
     : scan_node_(NULL),
       state_(NULL),
+      add_batches_to_queue_(true),
       context_(NULL),
       stream_(NULL),
       scanner_conjunct_ctxs_(NULL),
@@ -93,10 +96,9 @@ HdfsScanner::HdfsScanner()
 }
 
 HdfsScanner::~HdfsScanner() {
-  DCHECK(batch_ == NULL);
 }
 
-Status HdfsScanner::Prepare(ScannerContext* context) {
+Status HdfsScanner::Open(ScannerContext* context) {
   context_ = context;
   stream_ = context->GetStream();
 
@@ -116,13 +118,11 @@ Status HdfsScanner::Prepare(ScannerContext* context) {
       state_, context_->partition_descriptor()->partition_key_value_ctxs());
   template_tuple_map_[scan_node_->tuple_desc()] = template_tuple_;
 
-  // Allocate a new row batch. May fail if mem limit is exceeded.
-  RETURN_IF_ERROR(StartNewRowBatch());
   decompress_timer_ = ADD_TIMER(scan_node_->runtime_profile(), 
"DecompressionTime");
   return Status::OK();
 }
 
-void HdfsScanner::Close() {
+void HdfsScanner::Close(RowBatch* row_batch) {
   if (decompressor_.get() != NULL) decompressor_->Close();
   HdfsScanNode::ConjunctsMap::const_iterator iter = 
scanner_conjuncts_map_.begin();
   for (; iter != scanner_conjuncts_map_.end(); ++iter) {
@@ -155,6 +155,7 @@ Status 
HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition,
 }
 
 Status HdfsScanner::StartNewRowBatch() {
+  DCHECK(add_batches_to_queue_);
   batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
       scan_node_->mem_tracker());
   int64_t tuple_buffer_size;
@@ -164,6 +165,7 @@ Status HdfsScanner::StartNewRowBatch() {
 }
 
 int HdfsScanner::GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** 
tuple_row_mem) {
+  DCHECK(add_batches_to_queue_);
   DCHECK(batch_ != NULL);
   DCHECK_GT(batch_->capacity(), batch_->num_rows());
   *pool = batch_->tuple_data_pool();
@@ -185,6 +187,7 @@ Status 
HdfsScanner::GetCollectionMemory(CollectionValueBuilder* builder, MemPool
 
 // TODO(skye): have this check scan_node_->ReachedLimit() and get rid of 
manual check?
 Status HdfsScanner::CommitRows(int num_rows) {
+  DCHECK(add_batches_to_queue_);
   DCHECK(batch_ != NULL);
   DCHECK_LE(num_rows, batch_->capacity() - batch_->num_rows());
   batch_->CommitRows(num_rows);
@@ -209,13 +212,6 @@ Status HdfsScanner::CommitRows(int num_rows) {
   return Status::OK();
 }
 
-void HdfsScanner::AddFinalRowBatch() {
-  DCHECK(batch_ != NULL);
-  context_->ReleaseCompletedResources(batch_, /* done */ true);
-  scan_node_->AddMaterializedRowBatch(batch_);
-  batch_ = NULL;
-}
-
 // In this code path, no slots were materialized from the input files.  The 
only
 // slots are from partition keys.  This lets us simplify writing out the 
batches.
 //   1. template_tuple_ is the complete tuple.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 7a723b8..de39d6b 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -69,10 +69,16 @@ struct FieldLocation {
 /// HdfsScanner is the superclass for different hdfs file format parsers.  
There is
 /// an instance of the scanner object created for each split, each driven by a 
different
 /// thread created by the scan node.  The scan node calls:
-/// 1. Prepare
-/// 2. ProcessSplit
-/// 3. Close
-/// ProcessSplit does not return until the split is complete (or an error) 
occurred.
+/// 1. Open()
+/// 2. ProcessSplit() or GetNext()*
+/// 3. Close()
+/// The scanner can be used in either of two modes, indicated via the 
add_batches_to_queue
+/// c'tor parameter.
+/// ProcessSplit() scans the split and adds materialized row batches to the 
scan node's
+/// row batch queue until the split is complete or an error occurred.
+/// GetNext() provides an iterator-like interface where the caller can request
+/// the next materialized row batch until the split has been fully processed 
(eos).
+///
 /// The HdfsScanner works in tandem with the ScannerContext to interleave IO
 /// and parsing.
 //
@@ -84,9 +90,9 @@ struct FieldLocation {
 /// 1. During the Prepare() phase of the ScanNode, the scanner subclass's 
static
 ///    Codegen() function will be called to perform codegen for that scanner 
type
 ///    for the specific tuple desc. This codegen'd function is cached in the 
HdfsScanNode.
-/// 2. During the GetNext() phase (where we create one Scanner for each scan 
range),
-///    the created scanner subclass can retrieve, from the scan node, the 
codegen'd
-///    function to use.
+/// 2. During the GetNext() phase of the scan node (where we create one 
Scanner for each
+///    scan range), the created scanner subclass can retrieve, from the scan 
node,
+///    the codegen'd function to use.
 /// This way, we only codegen once per scanner type, rather than once per 
scanner object.
 //
 /// This class also encapsulates row batch management.  Subclasses should call 
CommitRows()
@@ -100,21 +106,45 @@ class HdfsScanner {
   /// This probably ought to be a derived number from the environment.
   const static int FILE_BLOCK_SIZE = 4096;
 
-  HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state);
+  /// If 'add_batches_to_queue' is true the caller must call ProcessSplit() 
and not
+  /// GetNext(). Row batches will be added to the scan node's row batch queue, 
including
+  /// the final one in Close().
+  HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state, bool 
add_batches_to_queue);
 
   virtual ~HdfsScanner();
 
   /// One-time initialisation of state that is constant across scan ranges.
-  virtual Status Prepare(ScannerContext* context);
+  virtual Status Open(ScannerContext* context);
+
+  /// Returns the next row batch from this scanner's split.
+  /// Recoverable errors are logged to the runtime state. Only returns a 
non-OK status
+  /// if a non-recoverable error is encountered (or abort_on_error is true). 
If OK is
+  /// returned, 'parse_status_' is guaranteed to be OK as well.
+  /// The memory referenced by the tuples is valid until this or any 
subsequently
+  /// returned batch is reset or destroyed.
+  /// Only valid to call if 'add_batches_to_queue_' is false.
+  Status GetNext(RowBatch* row_batch, bool* eos) {
+    DCHECK(!add_batches_to_queue_);
+    return GetNextInternal(row_batch, eos);
+  }
 
   /// Process an entire split, reading bytes from the context's streams.  
Context is
   /// initialized with the split data (e.g. template tuple, partition 
descriptor, etc).
   /// This function should only return on error or end of scan range.
+  /// Only valid to call if 'add_batches_to_queue_' is true.
   virtual Status ProcessSplit() = 0;
 
-  /// Release all resources the scanner has allocated.  This is the last 
chance for the
-  /// scanner to attach any resources to the ScannerContext object.
-  virtual void Close();
+  /// Transfers the ownership of memory backing returned tuples such as IO 
buffers
+  /// and memory in mem pools to the given row batch. If the row batch is NULL,
+  /// those resources are released instead. In any case, releases all other 
resources
+  /// that are not backing returned rows (e.g. temporary decompression 
buffers).
+  virtual void Close(RowBatch* row_batch);
+
+  /// Only valid to call if 'add_batches_to_queue_' is true.
+  RowBatch* batch() const {
+    DCHECK(add_batches_to_queue_);
+    return batch_;
+  }
 
   /// Scanner subclasses must implement these static functions as well.  
Unfortunately,
   /// c++ does not allow static virtual functions.
@@ -149,6 +179,11 @@ class HdfsScanner {
   /// RuntimeState for error reporting
   RuntimeState* state_;
 
+  /// True if the creator of this scanner intends to use ProcessSplit() and 
not GetNext).
+  /// Row batches will be added to the scan node's row batch queue, including 
the final
+  /// one in Close().
+  const bool add_batches_to_queue_;
+
   /// Context for this scanner
   ScannerContext* context_;
 
@@ -230,6 +265,13 @@ class HdfsScanner {
   /// Jitted write tuples function pointer.  Null if codegen is disabled.
   WriteTuplesFn write_tuples_fn_;
 
+  /// Implements GetNext(). Should be overridden by subclasses.
+  /// May be called even if 'add_batches_to_queue_' is true.
+  virtual Status GetNextInternal(RowBatch* row_batch, bool* eos) {
+    DCHECK(false) << "GetNextInternal() not implemented for this scanner 
type.";
+    return Status::OK();
+  }
+
   /// Initializes write_tuples_fn_ to the jitted function if codegen is 
possible.
   /// - partition - partition descriptor for this scanner/scan range
   /// - type - type for this scanner
@@ -238,6 +280,7 @@ class HdfsScanner {
       THdfsFileFormat::type type, const std::string& scanner_name);
 
   /// Set batch_ to a new row batch and update tuple_mem_ accordingly.
+  /// Only valid to call if 'add_batches_to_queue_' is true.
   Status StartNewRowBatch();
 
   /// Reset internal state for a new scan range.
@@ -251,6 +294,7 @@ class HdfsScanner {
   /// current row batch is complete and a new one is allocated).
   /// Memory returned from this call is invalidated after calling CommitRows().
   /// Callers must call GetMemory() again after calling this function.
+  /// Only valid to call if 'add_batches_to_queue_' is true.
   int GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem);
 
   /// Gets memory for outputting tuples into the CollectionValue being 
constructed via
@@ -269,18 +313,15 @@ class HdfsScanner {
   /// Returns Status::OK if the query is not cancelled and hasn't exceeded any 
mem limits.
   /// Scanner can call this with 0 rows to flush any pending resources 
(attached pools
   /// and io buffers) to minimize memory consumption.
+  /// Only valid to call if 'add_batches_to_queue_' is true.
   Status CommitRows(int num_rows);
 
-  /// Attach all remaining resources from context_ to batch_ and send batch_ 
to the scan
-  /// node. This must be called after all rows have been committed and no 
further
-  /// resources are needed from context_ (in practice this will happen in each 
scanner
-  /// subclass's Close() implementation).
-  void AddFinalRowBatch();
-
   /// Release all memory in 'pool' to batch_. If commit_batch is true, the row 
batch
   /// will be committed. commit_batch should be true if the attached pool is 
expected
   /// to be non-trivial (i.e. a decompression buffer) to minimize scanner mem 
usage.
+  /// Only valid to call if 'add_batches_to_queue_' is true.
   void AttachPool(MemPool* pool, bool commit_batch) {
+    DCHECK(add_batches_to_queue_);
     DCHECK(batch_ != NULL);
     DCHECK(pool != NULL);
     batch_->tuple_data_pool()->AcquireData(pool, false);
@@ -315,6 +356,7 @@ class HdfsScanner {
   /// Returns the number of tuples added to the row batch.  This can be less 
than
   /// num_tuples/tuples_till_limit because of failed conjuncts.
   /// Returns -1 if parsing should be aborted due to parse errors.
+  /// Only valid to call if 'add_batches_to_queue_' is true.
   int WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row_mem, int row_size,
       FieldLocation* fields, int num_tuples,
       int max_added_tuples, int slots_per_tuple, int row_start_indx);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc 
b/be/src/exec/hdfs-sequence-scanner.cc
index 0cd000f..025d4a4 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -38,8 +38,9 @@ const uint8_t HdfsSequenceScanner::SEQFILE_VERSION_HEADER[4] 
= {'S', 'E', 'Q', 6
 
 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
 
-HdfsSequenceScanner::HdfsSequenceScanner(HdfsScanNode* scan_node, 
RuntimeState* state)
-    : BaseSequenceScanner(scan_node, state),
+HdfsSequenceScanner::HdfsSequenceScanner(HdfsScanNode* scan_node, 
RuntimeState* state,
+    bool add_batches_to_queue)
+    : BaseSequenceScanner(scan_node, state, add_batches_to_queue),
       unparsed_data_buffer_(NULL),
       num_buffered_records_in_compressed_block_(0) {
 }
@@ -86,8 +87,8 @@ Status HdfsSequenceScanner::InitNewRange() {
   return Status::OK();
 }
 
-Status HdfsSequenceScanner::Prepare(ScannerContext* context) {
-  RETURN_IF_ERROR(BaseSequenceScanner::Prepare(context));
+Status HdfsSequenceScanner::Open(ScannerContext* context) {
+  RETURN_IF_ERROR(BaseSequenceScanner::Open(context));
 
   // Allocate the scratch space for two pass parsing.  The most fields we can 
go
   // through in one parse pass is the batch size (tuples) * the number of 
fields per tuple

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.h 
b/be/src/exec/hdfs-sequence-scanner.h
index ae7f3bb..5b3da92 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -159,12 +159,13 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
   /// SeqFile file: {'S', 'E', 'Q', 6}
   static const uint8_t SEQFILE_VERSION_HEADER[4];
 
-  HdfsSequenceScanner(HdfsScanNode* scan_node, RuntimeState* state);
+  HdfsSequenceScanner(HdfsScanNode* scan_node, RuntimeState* state,
+      bool add_batches_to_queue);
 
   virtual ~HdfsSequenceScanner();
-  
+
   /// Implementation of HdfsScanner interface.
-  virtual Status Prepare(ScannerContext* context);
+  virtual Status Open(ScannerContext* context);
 
   /// Codegen writing tuples and evaluating predicates.
   static llvm::Function* Codegen(HdfsScanNode*,
@@ -176,9 +177,9 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
   virtual Status ReadFileHeader();
   virtual Status InitNewRange();
   virtual Status ProcessRange();
-  
-  virtual THdfsFileFormat::type file_format() const { 
-    return THdfsFileFormat::SEQUENCE_FILE; 
+
+  virtual THdfsFileFormat::type file_format() const {
+    return THdfsFileFormat::SEQUENCE_FILE;
   }
 
  private:
@@ -213,11 +214,11 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
   ///   record_ptr: ponter to the record.
   ///   record_len: length of the record
   Status GetRecord(uint8_t** record_ptr, int64_t *record_len);
-  
+
   /// Appends the current file and line to the RuntimeState's error log.
   /// row_idx is 0-based (in current batch) where the parse error occurred.
   virtual void LogRowParseError(int row_idx, std::stringstream*);
-  
+
   /// Helper class for picking fields and rows from delimited text.
   boost::scoped_ptr<DelimitedTextParser> delimited_text_parser_;
   std::vector<FieldLocation> field_locations_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 5d80f06..ec7af87 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -47,8 +47,9 @@ const string HdfsTextScanner::LZO_INDEX_SUFFIX = ".index";
 // progress.
 const int64_t COMPRESSED_DATA_FIXED_READ_SIZE = 1 * 1024 * 1024;
 
-HdfsTextScanner::HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state)
-    : HdfsScanner(scan_node, state),
+HdfsTextScanner::HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state,
+    bool add_batches_to_queue)
+    : HdfsScanner(scan_node, state, add_batches_to_queue),
       byte_buffer_ptr_(NULL),
       byte_buffer_end_(NULL),
       byte_buffer_read_size_(0),
@@ -145,6 +146,8 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNode* 
scan_node,
 }
 
 Status HdfsTextScanner::ProcessSplit() {
+  DCHECK(add_batches_to_queue_);
+
   // Reset state for new scan range
   RETURN_IF_ERROR(InitNewRange());
 
@@ -173,28 +176,28 @@ Status HdfsTextScanner::ProcessSplit() {
   return Status::OK();
 }
 
-void HdfsTextScanner::Close() {
+void HdfsTextScanner::Close(RowBatch* row_batch) {
   // Need to close the decompressor before releasing the resources at 
AddFinalRowBatch(),
   // because in some cases there is memory allocated in decompressor_'s 
temp_memory_pool_.
   if (decompressor_.get() != NULL) {
     decompressor_->Close();
     decompressor_.reset(NULL);
   }
-  if (batch_ != NULL) {
-    AttachPool(data_buffer_pool_.get(), false);
-    AttachPool(boundary_pool_.get(), false);
-    AddFinalRowBatch();
+  if (row_batch != NULL) {
+    row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
+    row_batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), false);
+    context_->ReleaseCompletedResources(row_batch, true);
+    if (add_batches_to_queue_) scan_node_->AddMaterializedRowBatch(row_batch);
   }
   // Verify all resources (if any) have been transferred.
   DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(boundary_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(context_->num_completed_io_buffers(), 0);
-  // Must happen after AddFinalRowBatch() is called.
   if (!only_parsing_header_) {
     scan_node_->RangeComplete(THdfsFileFormat::TEXT,
         stream_->file_desc()->file_compression);
   }
-  HdfsScanner::Close();
+  HdfsScanner::Close(row_batch);
 }
 
 Status HdfsTextScanner::InitNewRange() {
@@ -692,8 +695,8 @@ Function* HdfsTextScanner::Codegen(HdfsScanNode* node,
   return CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn);
 }
 
-Status HdfsTextScanner::Prepare(ScannerContext* context) {
-  RETURN_IF_ERROR(HdfsScanner::Prepare(context));
+Status HdfsTextScanner::Open(ScannerContext* context) {
+  RETURN_IF_ERROR(HdfsScanner::Open(context));
 
   parse_delimiter_timer_ = ADD_CHILD_TIMER(scan_node_->runtime_profile(),
       "DelimiterParseTime", ScanNode::SCANNER_THREAD_TOTAL_WALLCLOCK_TIME);
@@ -704,6 +707,8 @@ Status HdfsTextScanner::Prepare(ScannerContext* context) {
   field_locations_.resize(state_->batch_size() * 
scan_node_->materialized_slots().size());
   row_end_locations_.resize(state_->batch_size());
 
+  // Allocate a new row batch. May fail if mem limit is exceeded.
+  RETURN_IF_ERROR(StartNewRowBatch());
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index e8dbe08..74784b8 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -44,13 +44,14 @@ struct HdfsFileDesc;
 /// scanner for the tuple directly after it.
 class HdfsTextScanner : public HdfsScanner {
  public:
-  HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state);
+  HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state,
+      bool add_batches_to_queue);
   virtual ~HdfsTextScanner();
 
   /// Implementation of HdfsScanner interface.
-  virtual Status Prepare(ScannerContext* context);
+  virtual Status Open(ScannerContext* context);
   virtual Status ProcessSplit();
-  virtual void Close();
+  virtual void Close(RowBatch* row_batch);
 
   /// Issue io manager byte ranges for 'files'.
   static Status IssueInitialRanges(HdfsScanNode* scan_node,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h 
b/be/src/exec/parquet-column-readers.h
index 3c26084..d308091 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -224,6 +224,10 @@ class ParquetColumnReader {
   /// Returns true if this column reader has reached the end of the row group.
   inline bool RowGroupAtEnd() { return rep_level_ == 
HdfsParquetScanner::ROW_GROUP_END; }
 
+  /// Transfers the remaining resources backing tuples to the given row batch,
+  /// and frees up other resources.
+  virtual void Close(RowBatch* row_batch) = 0;
+
  protected:
   HdfsParquetScanner* parent_;
   const SchemaNode& node_;
@@ -330,8 +334,10 @@ class BaseScalarColumnReader : public ParquetColumnReader {
     return Status::OK();
   }
 
-  /// Called once when the scanner is complete for final cleanup.
-  void Close() {
+  virtual void Close(RowBatch* row_batch) {
+    if (decompressed_data_pool_.get() != NULL) {
+      row_batch->tuple_data_pool()->AcquireData(decompressed_data_pool_.get(), 
false);
+    }
     if (decompressor_.get() != NULL) decompressor_->Close();
   }
 
@@ -475,6 +481,12 @@ class CollectionColumnReader : public ParquetColumnReader {
     pos_current_value_ = -1;
   }
 
+  virtual void Close(RowBatch* row_batch) {
+    for (ParquetColumnReader* child_reader: children_) {
+      child_reader->Close(row_batch);
+    }
+  }
+
  private:
   /// Column readers of fields contained within this collection. There is at 
least one
   /// child reader per collection reader. Child readers either materialize 
slots in the

Reply via email to