IMPALA-3905: Implements HdfsScanner::GetNext() for text scans.

Implements HdfsLzoTextTextScanner::GetNext() and changes
ProcessSplit() to repeatedly call GetNext() to share the core
scanning code between the legacy ProcessSplit() interface
(ProcessSpit()) and the new GetNext() interface.

These changes were tricky:
- The scanner used to rely on the ability to attach a batch
  to the row-batch queue for freeing resources
- This patch attempts to preserve the resource-freeing behavior
  by clearing resources as soon as they are complete
- In particular, the scanner attempts to skip corrupt/invalid
  data blocks, and we should avoid accumulating memory
  unnecessarily

The other changes are mostly straightforward:
- Add a RowBatch parameter to various functions
- Add a MemPool parameter to various functions for attaching
  memory of completed resources that may still be references
  by returned batches
- Change Close() to free all resources when a nullptr
  RowBatch is passed

Testing:
- Exhaustive tests passed on debug
- Core tests passed on asan
- TODO: Perf testing on cluster

Change-Id: Id193aa223434d7cc40061a42f81bbb29dcd0404b
Reviewed-on: http://gerrit.cloudera.org:8080/6000
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Alex Behm <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/be8d1512
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/be8d1512
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/be8d1512

Branch: refs/heads/master
Commit: be8d15122de9b1ee35bdd2de43b8daaaa7547098
Parents: 85d7f5e
Author: Alex Behm <[email protected]>
Authored: Tue Sep 13 10:05:00 2016 -0700
Committer: Alex Behm <[email protected]>
Committed: Tue Apr 4 01:21:07 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc             |  12 +-
 be/src/exec/hdfs-parquet-scanner.h              |   5 -
 be/src/exec/hdfs-scan-node-base.cc              |  13 +
 be/src/exec/hdfs-scan-node-base.h               |  13 +
 be/src/exec/hdfs-scan-node-mt.cc                |   3 +-
 be/src/exec/hdfs-scan-node.cc                   |  13 -
 be/src/exec/hdfs-scan-node.h                    |  15 -
 be/src/exec/hdfs-scanner-ir.cc                  |   2 -
 be/src/exec/hdfs-scanner.cc                     |  26 +-
 be/src/exec/hdfs-scanner.h                      |  24 +-
 be/src/exec/hdfs-text-scanner.cc                | 298 +++++++++++--------
 be/src/exec/hdfs-text-scanner.h                 |  80 +++--
 be/src/exec/scanner-context.cc                  |   2 +-
 be/src/exec/scanner-context.h                   |   5 +-
 .../org/apache/impala/planner/HdfsScanNode.java |   4 +-
 15 files changed, 296 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index f21ccff..aba2bef 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -404,7 +404,8 @@ Status HdfsParquetScanner::ProcessSplit() {
   DCHECK(scan_node_->HasRowBatchQueue());
   HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
   do {
-    StartNewParquetRowBatch();
+    batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
+        scan_node_->mem_tracker());
     RETURN_IF_ERROR(GetNextInternal(batch_));
     scan_node->AddMaterializedRowBatch(batch_);
     ++row_batches_produced_;
@@ -414,7 +415,8 @@ Status HdfsParquetScanner::ProcessSplit() {
   } while (!eos_ && !scan_node_->ReachedLimit());
 
   // Transfer the remaining resources to this new batch in Close().
-  StartNewParquetRowBatch();
+  batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
+      scan_node_->mem_tracker());
   return Status::OK();
 }
 
@@ -976,12 +978,6 @@ Status HdfsParquetScanner::AssembleRows(
   return Status::OK();
 }
 
-void HdfsParquetScanner::StartNewParquetRowBatch() {
-  DCHECK(scan_node_->HasRowBatchQueue());
-  batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
-      scan_node_->mem_tracker());
-}
-
 Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) {
   DCHECK(dst_batch != NULL);
   dst_batch->CommitRows(num_rows);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h 
b/be/src/exec/hdfs-parquet-scanner.h
index 34bfcc9..18c14fe 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -509,11 +509,6 @@ class HdfsParquetScanner : public HdfsScanner {
   Status AssembleRows(const std::vector<ParquetColumnReader*>& column_readers,
       RowBatch* row_batch, bool* skip_row_group);
 
-  /// Set 'batch_' to a new row batch. Unlike the similarly named function in
-  /// HdfsScanner, this function will not allocate the tuple buffer. Only valid
-  /// to call if 'add_batches_to_queue_' is true.
-  void StartNewParquetRowBatch();
-
   /// Commit num_rows to the given row batch.
   /// Returns OK if the query is not cancelled and hasn't exceeded any mem 
limits.
   /// Scanner can call this with 0 rows to flush any pending resources 
(attached pools

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index e478b69..238155f 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -625,6 +625,19 @@ HdfsFileDesc* HdfsScanNodeBase::GetFileDesc(const string& 
filename) {
   return file_descs_[filename];
 }
 
+void HdfsScanNodeBase::SetFileMetadata(const string& filename, void* metadata) 
{
+  unique_lock<mutex> l(metadata_lock_);
+  DCHECK(per_file_metadata_.find(filename) == per_file_metadata_.end());
+  per_file_metadata_[filename] = metadata;
+}
+
+void* HdfsScanNodeBase::GetFileMetadata(const string& filename) {
+  unique_lock<mutex> l(metadata_lock_);
+  map<string, void*>::iterator it = per_file_metadata_.find(filename);
+  if (it == per_file_metadata_.end()) return NULL;
+  return it->second;
+}
+
 void* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) {
   CodegendFnMap::iterator it = codegend_fn_map_.find(type);
   if (it == codegend_fn_map_.end()) return NULL;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h 
b/be/src/exec/hdfs-scan-node-base.h
index cde9574..7777d4d 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -233,6 +233,14 @@ class HdfsScanNodeBase : public ScanNode {
   /// Returns the file desc for 'filename'.  Returns NULL if filename is 
invalid.
   HdfsFileDesc* GetFileDesc(const std::string& filename);
 
+  /// Sets the scanner specific metadata for 'filename'. Scanners can use this 
to store
+  /// file header information. Thread safe.
+  void SetFileMetadata(const std::string& filename, void* metadata);
+
+  /// Returns the scanner specific metadata for 'filename'. Returns NULL if 
there is no
+  /// metadata. Thread safe.
+  void* GetFileMetadata(const std::string& filename);
+
   /// Called by scanners when a range is complete. Used to record progress.
   /// This *must* only be called after a scanner has completely finished its
   /// scan range (i.e. context->Flush()), and has returned the final row batch.
@@ -336,6 +344,11 @@ class HdfsScanNodeBase : public ScanNode {
   typedef std::map<THdfsFileFormat::type, std::vector<HdfsFileDesc*>> 
FileFormatsMap;
   FileFormatsMap per_type_files_;
 
+  /// Scanner specific per file metadata (e.g. header information) and 
associated lock.
+  /// TODO: Remove this lock when removing the legacy scanners and scan nodes.
+  boost::mutex metadata_lock_;
+  std::map<std::string, void*> per_file_metadata_;
+
   /// Conjuncts for each materialized tuple (top-level row batch tuples and 
collection
   /// item tuples). Includes a copy of ExecNode.conjuncts_.
   ConjunctsMap conjuncts_map_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 50936b8..9fad46e 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -45,7 +45,8 @@ Status HdfsScanNodeMt::Prepare(RuntimeState* state) {
   // Return an error if this scan node has been assigned a range that is not 
supported
   // because the scanner of the corresponding file format does implement 
GetNext().
   for (const auto& files: per_type_files_) {
-    if (!files.second.empty() && files.first != THdfsFileFormat::PARQUET) {
+    if (!files.second.empty() && files.first != THdfsFileFormat::PARQUET
+        && files.first != THdfsFileFormat::TEXT) {
       stringstream msg;
       msg << "Unsupported file format with HdfsScanNodeMt: " << files.first;
       return Status(msg.str());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 03217e0..c3972b2 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -239,19 +239,6 @@ void HdfsScanNode::Close(RuntimeState* state) {
   HdfsScanNodeBase::Close(state);
 }
 
-void HdfsScanNode::SetFileMetadata(const string& filename, void* metadata) {
-  unique_lock<mutex> l(metadata_lock_);
-  DCHECK(per_file_metadata_.find(filename) == per_file_metadata_.end());
-  per_file_metadata_[filename] = metadata;
-}
-
-void* HdfsScanNode::GetFileMetadata(const string& filename) {
-  unique_lock<mutex> l(metadata_lock_);
-  map<string, void*>::iterator it = per_file_metadata_.find(filename);
-  if (it == per_file_metadata_.end()) return NULL;
-  return it->second;
-}
-
 void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type,
     const std::vector<THdfsCompression::type>& compression_type) {
   lock_guard<SpinLock> l(file_type_counts_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index f8064b1..41647da 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -85,16 +85,6 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// This function will block if materialized_row_batches_ is full.
   void AddMaterializedRowBatch(RowBatch* row_batch);
 
-  /// Sets the scanner specific metadata for 'filename'.
-  /// This is thread safe.
-  void SetFileMetadata(const std::string& filename, void* metadata);
-
-  /// Gets scanner specific metadata for 'filename'.  Scanners can use this to 
store
-  /// file header information.
-  /// Returns NULL if there is no metadata.
-  /// This is thread safe.
-  void* GetFileMetadata(const std::string& filename);
-
   /// Called by scanners when a range is complete. Used to record progress and 
set done_.
   /// This *must* only be called after a scanner has completely finished its
   /// scan range (i.e. context->Flush()), and has added the final row batch to 
the row
@@ -115,11 +105,6 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// scanner threads.
   int64_t scanner_thread_bytes_required_;
 
-  /// Scanner specific per file metadata (e.g. header information) and 
associated lock.
-  /// This lock cannot be taken together with any other locks except lock_.
-  boost::mutex metadata_lock_;
-  std::map<std::string, void*> per_file_metadata_;
-
   /// Thread group for all scanner worker threads
   ThreadGroup scanner_threads_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc
index e6da220..5b474d1 100644
--- a/be/src/exec/hdfs-scanner-ir.cc
+++ b/be/src/exec/hdfs-scanner-ir.cc
@@ -36,8 +36,6 @@ using namespace impala;
 int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row, int 
row_size,
     FieldLocation* fields, int num_tuples, int max_added_tuples,
     int slots_per_tuple, int row_idx_start) {
-
-  DCHECK(scan_node_->HasRowBatchQueue());
   DCHECK(tuple_ != NULL);
   uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(tuple_row);
   uint8_t* tuple_mem = reinterpret_cast<uint8_t*>(tuple_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 64a922d..d613b15 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -192,26 +192,30 @@ Status 
HdfsScanner::GetCollectionMemory(CollectionValueBuilder* builder, MemPool
   return Status::OK();
 }
 
-// TODO(skye): have this check scan_node_->ReachedLimit() and get rid of 
manual check?
-Status HdfsScanner::CommitRows(int num_rows) {
-  DCHECK(scan_node_->HasRowBatchQueue());
-  DCHECK(batch_ != NULL);
-  DCHECK_LE(num_rows, batch_->capacity() - batch_->num_rows());
-  batch_->CommitRows(num_rows);
+Status HdfsScanner::CommitRows(int num_rows, bool enqueue_if_full, RowBatch* 
row_batch) {
+  DCHECK(batch_ != NULL || !scan_node_->HasRowBatchQueue());
+  DCHECK(batch_ == row_batch || !scan_node_->HasRowBatchQueue());
+  DCHECK(!enqueue_if_full || scan_node_->HasRowBatchQueue());
+  DCHECK_LE(num_rows, row_batch->capacity() - row_batch->num_rows());
+  row_batch->CommitRows(num_rows);
   tuple_mem_ += static_cast<int64_t>(scan_node_->tuple_desc()->byte_size()) * 
num_rows;
+  tuple_ = reinterpret_cast<Tuple*>(tuple_mem_);
 
   // We need to pass the row batch to the scan node if there is too much 
memory attached,
   // which can happen if the query is very selective. We need to release 
memory even
   // if no rows passed predicates.
-  if (batch_->AtCapacity() || context_->num_completed_io_buffers() > 0) {
-    context_->ReleaseCompletedResources(batch_, /* done */ false);
-    static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(batch_);
-    RETURN_IF_ERROR(StartNewRowBatch());
+  if (row_batch->AtCapacity() || context_->num_completed_io_buffers() > 0) {
+    context_->ReleaseCompletedResources(row_batch, /* done */ false);
+    if (enqueue_if_full) {
+      
static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch);
+      RETURN_IF_ERROR(StartNewRowBatch());
+    }
   }
   if (context_->cancelled()) return Status::CANCELLED;
   // Check for UDF errors.
   RETURN_IF_ERROR(state_->GetQueryStatus());
-  // Free local expr allocations for this thread
+  // Free local expr allocations for this thread to avoid accumulating too much
+  // memory from evaluating the scanner conjuncts.
   for (const auto& entry: scanner_conjuncts_map_) {
     ExprContext::FreeLocalAllocations(entry.second);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index df21fb2..f61c5fc 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -322,13 +322,23 @@ class HdfsScanner {
   Status GetCollectionMemory(CollectionValueBuilder* builder, MemPool** pool,
       Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows);
 
-  /// Commit num_rows to the current row batch.  If this completes, the row 
batch is
-  /// enqueued with the scan node and StartNewRowBatch() is called.
-  /// Returns Status::OK if the query is not cancelled and hasn't exceeded any 
mem limits.
-  /// Scanner can call this with 0 rows to flush any pending resources 
(attached pools
-  /// and io buffers) to minimize memory consumption.
-  /// Only valid to call if the parent scan node is multi-threaded.
-  Status CommitRows(int num_rows);
+  /// Commits 'num_rows' to 'row_batch'. Advances 'tuple_mem_' and 'tuple_' 
accordingly.
+  /// Attaches completed resources from 'context_' to 'row_batch' if necessary.
+  /// Frees local expr allocations.
+  /// If 'enqueue_if_full' is true and 'row_batch' is at capacity after 
committing the
+  /// rows, then 'row_batch' is added to the queue, and a new batch is created 
with
+  /// StartNewRowBatch(). It is only valid to pass true for 'enqueue_if_full' 
if the
+  /// parent parent scan node is multi-threaded.
+  /// Returns non-OK if 'context_' is cancelled or the query status in 
'state_' is
+  /// non-OK.
+  Status CommitRows(int num_rows, bool enqueue_if_full, RowBatch* row_batch);
+
+  /// Calls the above CommitRows() passing true for 'queue_if_full', and 
'batch_' as the
+  /// row batch. Only valid to call if the parent scan node is multi-threaded.
+  Status CommitRows(int num_rows) {
+    DCHECK(scan_node_->HasRowBatchQueue());
+    return CommitRows(num_rows, true, batch_);
+  }
 
   /// Release all memory in 'pool' to batch_. If 'commit_batch' is true, the 
row batch
   /// will be committed. 'commit_batch' should be true if the attached pool is 
expected

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 07281df..ced7ab1 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -52,15 +52,20 @@ const int64_t COMPRESSED_DATA_FIXED_READ_SIZE = 1 * 1024 * 
1024;
 
 HdfsTextScanner::HdfsTextScanner(HdfsScanNodeBase* scan_node, RuntimeState* 
state)
     : HdfsScanner(scan_node, state),
-      byte_buffer_ptr_(NULL),
-      byte_buffer_end_(NULL),
+      byte_buffer_ptr_(nullptr),
+      byte_buffer_end_(nullptr),
       byte_buffer_read_size_(0),
       only_parsing_header_(false),
+      scan_state_(CONSTRUCTED),
       boundary_pool_(new MemPool(scan_node->mem_tracker())),
       boundary_row_(boundary_pool_.get()),
       boundary_column_(boundary_pool_.get()),
       slot_idx_(0),
-      error_in_row_(false) {
+      batch_start_ptr_(nullptr),
+      error_in_row_(false),
+      partial_tuple_(nullptr),
+      partial_tuple_empty_(true),
+      parse_delimiter_timer_(nullptr) {
 }
 
 HdfsTextScanner::~HdfsTextScanner() {
@@ -149,43 +154,29 @@ Status 
HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
 
 Status HdfsTextScanner::ProcessSplit() {
   DCHECK(scan_node_->HasRowBatchQueue());
-
-  // Reset state for new scan range
-  RETURN_IF_ERROR(InitNewRange());
-
-  // Find the first tuple.  If tuple_found is false, it means we went through 
the entire
-  // scan range without finding a single tuple.  The bytes will be picked up 
by the scan
-  // range before.
-  bool tuple_found;
-  RETURN_IF_ERROR(FindFirstTuple(&tuple_found));
-
-  if (tuple_found) {
-    // Update the decompressor depending on the compression type of the file 
in the
-    // context.
-    DCHECK(stream_->file_desc()->file_compression != THdfsCompression::SNAPPY)
-        << "FE should have generated SNAPPY_BLOCKED instead.";
-    
RETURN_IF_ERROR(UpdateDecompressor(stream_->file_desc()->file_compression));
-
-    // Process the scan range.
-    int dummy_num_tuples;
-    RETURN_IF_ERROR(ProcessRange(&dummy_num_tuples, false));
-
-    // Finish up reading past the scan range.
-    RETURN_IF_ERROR(FinishScanRange());
-  }
-
-  // All done with this scan range.
+  HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
+  do {
+    batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
+        scan_node_->mem_tracker());
+    RETURN_IF_ERROR(GetNextInternal(batch_));
+    scan_node->AddMaterializedRowBatch(batch_);
+  } while (!eos_ && !scan_node_->ReachedLimit());
+
+  // Transfer the remaining resources to this new batch in Close().
+  batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
+      scan_node_->mem_tracker());
   return Status::OK();
 }
 
 void HdfsTextScanner::Close(RowBatch* row_batch) {
-  // Need to close the decompressor before releasing the resources at 
AddFinalRowBatch(),
-  // because in some cases there is memory allocated in decompressor_'s 
temp_memory_pool_.
-  if (decompressor_.get() != NULL) {
+  // Need to close the decompressor before transferring the remaining 
resources to
+  // 'row_batch' because in some cases there is memory allocated in the 
decompressor_'s
+  // temp_memory_pool_.
+  if (decompressor_ != nullptr) {
     decompressor_->Close();
-    decompressor_.reset(NULL);
+    decompressor_.reset();
   }
-  if (row_batch != NULL) {
+  if (row_batch != nullptr) {
     row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), 
false);
     row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
     row_batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), false);
@@ -194,10 +185,13 @@ void HdfsTextScanner::Close(RowBatch* row_batch) {
       
static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch);
     }
   } else {
-    if (template_tuple_pool_.get() != NULL) template_tuple_pool_->FreeAll();
+    if (template_tuple_pool_ != nullptr) template_tuple_pool_->FreeAll();
+    if (data_buffer_pool_ != nullptr) data_buffer_pool_->FreeAll();
+    if (boundary_pool_ != nullptr) boundary_pool_->FreeAll();
+    context_->ReleaseCompletedResources(nullptr, true);
   }
 
-  // Verify all resources (if any) have been transferred.
+  // Verify all resources (if any) have been transferred or freed.
   DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(boundary_pool_.get()->total_allocated_bytes(), 0);
@@ -210,12 +204,18 @@ void HdfsTextScanner::Close(RowBatch* row_batch) {
 }
 
 Status HdfsTextScanner::InitNewRange() {
+  DCHECK_EQ(scan_state_, CONSTRUCTED);
   // Compressed text does not reference data in the io buffers directly. In 
such case, we
   // can recycle the buffers in the stream_ more promptly.
   if (stream_->file_desc()->file_compression != THdfsCompression::NONE) {
     stream_->set_contains_tuple_data(false);
   }
 
+  // Update the decompressor based on the compression type of the file in the 
context.
+  DCHECK(stream_->file_desc()->file_compression != THdfsCompression::SNAPPY)
+      << "FE should have generated SNAPPY_BLOCKED instead.";
+  RETURN_IF_ERROR(UpdateDecompressor(stream_->file_desc()->file_compression));
+
   HdfsPartitionDescriptor* hdfs_partition = context_->partition_descriptor();
   char field_delim = hdfs_partition->field_delim();
   char collection_delim = hdfs_partition->collection_delim();
@@ -233,38 +233,34 @@ Status HdfsTextScanner::InitNewRange() {
       state_->strict_mode()));
 
   RETURN_IF_ERROR(ResetScanner());
+  scan_state_ = SCAN_RANGE_INITIALIZED;
   return Status::OK();
 }
 
 Status HdfsTextScanner::ResetScanner() {
-  error_in_row_ = false;
-
-  // Note - this initialisation relies on the assumption that N partition keys 
will occupy
-  // entries 0 through N-1 in column_idx_to_slot_idx. If this changes, we will 
need
-  // another layer of indirection to map text-file column indexes onto the
-  // column_idx_to_slot_idx table used below.
+  // Assumes that N partition keys occupy entries 0 through N-1 in 
materialized_slots_.
+  // If this changes, we will need another layer of indirection to map 
text-file column
+  // indexes to their destination slot.
   slot_idx_ = 0;
 
+  error_in_row_ = false;
   boundary_column_.Clear();
   boundary_row_.Clear();
   delimited_text_parser_->ParserReset();
-
+  byte_buffer_ptr_ = byte_buffer_end_ = nullptr;
+  partial_tuple_ = Tuple::Create(tuple_byte_size_, boundary_pool_.get());
   partial_tuple_empty_ = true;
-  byte_buffer_ptr_ = byte_buffer_end_ = NULL;
-
-  partial_tuple_ =
-      Tuple::Create(scan_node_->tuple_desc()->byte_size(), 
boundary_pool_.get());
 
   // Initialize codegen fn
   RETURN_IF_ERROR(InitializeWriteTuplesFn(
-    context_->partition_descriptor(), THdfsFileFormat::TEXT, 
"HdfsTextScanner"));
+      context_->partition_descriptor(), THdfsFileFormat::TEXT, 
"HdfsTextScanner"));
   return Status::OK();
 }
 
-Status HdfsTextScanner::FinishScanRange() {
-  if (scan_node_->ReachedLimit()) return Status::OK();
-
+Status HdfsTextScanner::FinishScanRange(RowBatch* row_batch) {
+  DCHECK(!row_batch->AtCapacity());
   DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_);
+
   bool split_delimiter;
   RETURN_IF_ERROR(CheckForSplitDelimiter(&split_delimiter));
   if (split_delimiter) {
@@ -275,11 +271,13 @@ Status HdfsTextScanner::FinishScanRange() {
     DCHECK(partial_tuple_empty_);
     DCHECK(boundary_column_.IsEmpty());
     DCHECK(boundary_row_.IsEmpty());
+    scan_state_ = DONE;
     return Status::OK();
   }
 
   // For text we always need to scan past the scan range to find the next 
delimiter
   while (true) {
+    DCHECK_EQ(scan_state_, PAST_SCAN_RANGE);
     bool eosr = true;
     Status status = Status::OK();
     byte_buffer_read_size_ = 0;
@@ -287,8 +285,8 @@ Status HdfsTextScanner::FinishScanRange() {
     // If compressed text, then there is nothing more to be read.
     // TODO: calling FillByteBuffer() at eof() can cause
     // ScannerContext::Stream::GetNextBuffer to DCHECK. Fix this.
-    if (decompressor_.get() == NULL && !stream_->eof()) {
-      status = FillByteBuffer(&eosr, NEXT_BLOCK_READ_SIZE);
+    if (decompressor_.get() == nullptr && !stream_->eof()) {
+      status = FillByteBuffer(row_batch->tuple_data_pool(), &eosr, 
NEXT_BLOCK_READ_SIZE);
     }
 
     if (!status.ok() || byte_buffer_read_size_ == 0) {
@@ -311,25 +309,25 @@ Status HdfsTextScanner::FinishScanRange() {
         
RETURN_IF_ERROR(delimited_text_parser_->FillColumns<true>(boundary_column_.len(),
             &col, &num_fields, &field_locations_[0]));
 
-        MemPool* pool;
-        TupleRow* tuple_row_mem;
-        int max_tuples = GetMemory(&pool, &tuple_, &tuple_row_mem);
+        TupleRow* tuple_row_mem = row_batch->GetRow(row_batch->AddRow());
+        int max_tuples = row_batch->capacity() - row_batch->num_rows();
         DCHECK_GE(max_tuples, 1);
         // Set variables for proper error outputting on boundary tuple
         batch_start_ptr_ = boundary_row_.buffer();
         row_end_locations_[0] = batch_start_ptr_ + boundary_row_.len();
-        int num_tuples = WriteFields(pool, tuple_row_mem, num_fields, 1);
+        int num_tuples =
+            WriteFields(num_fields, 1, row_batch->tuple_data_pool(), 
tuple_row_mem);
         DCHECK_LE(num_tuples, 1);
         DCHECK_GE(num_tuples, 0);
         COUNTER_ADD(scan_node_->rows_read_counter(), num_tuples);
-        RETURN_IF_ERROR(CommitRows(num_tuples));
+        RETURN_IF_ERROR(CommitRows(num_tuples, false, row_batch));
       } else if (delimited_text_parser_->HasUnfinishedTuple()) {
         DCHECK(scan_node_->materialized_slots().empty());
         DCHECK_EQ(scan_node_->num_materialized_partition_keys(), 0);
         // If no fields are materialized we do not update partial_tuple_empty_,
         // boundary_column_, or boundary_row_. However, we still need to 
handle the case
         // of partial tuple due to missing tuple delimiter at the end of file.
-        RETURN_IF_ERROR(CommitRows(1));
+        RETURN_IF_ERROR(CommitRows(1, false, row_batch));
       }
       break;
     }
@@ -337,27 +335,29 @@ Status HdfsTextScanner::FinishScanRange() {
     DCHECK(eosr);
 
     int num_tuples;
-    RETURN_IF_ERROR(ProcessRange(&num_tuples, true));
+    RETURN_IF_ERROR(ProcessRange(row_batch, &num_tuples));
     if (num_tuples == 1) break;
     DCHECK_EQ(num_tuples, 0);
   }
 
+  scan_state_ = DONE;
   return Status::OK();
 }
 
-Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) {
-  bool eosr = past_scan_range || stream_->eosr();
+Status HdfsTextScanner::ProcessRange(RowBatch* row_batch, int* num_tuples) {
+  DCHECK(scan_state_ == FIRST_TUPLE_FOUND || scan_state_ == PAST_SCAN_RANGE);
 
+  MemPool* pool = row_batch->tuple_data_pool();
+  bool eosr = stream_->eosr() || scan_state_ == PAST_SCAN_RANGE;
   while (true) {
     if (!eosr && byte_buffer_ptr_ == byte_buffer_end_) {
-      RETURN_IF_ERROR(FillByteBuffer(&eosr));
+      RETURN_IF_ERROR(FillByteBuffer(pool, &eosr));
     }
 
-    MemPool* pool;
-    TupleRow* tuple_row_mem;
-    int max_tuples = GetMemory(&pool, &tuple_, &tuple_row_mem);
+    TupleRow* tuple_row_mem = row_batch->GetRow(row_batch->AddRow());
+    int max_tuples = row_batch->capacity() - row_batch->num_rows();
 
-    if (past_scan_range) {
+    if (scan_state_ == PAST_SCAN_RANGE) {
       // byte_buffer_ptr_ is already set from FinishScanRange()
       max_tuples = 1;
       eosr = true;
@@ -389,7 +389,7 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool 
past_scan_range) {
         RETURN_IF_ERROR(CopyBoundaryField(&field_locations_[0], pool));
         boundary_column_.Clear();
       }
-      num_tuples_materialized = WriteFields(pool, tuple_row_mem, num_fields, 
*num_tuples);
+      num_tuples_materialized = WriteFields(num_fields, *num_tuples, pool, 
tuple_row_mem);
       DCHECK_GE(num_tuples_materialized, 0);
       RETURN_IF_ERROR(parse_status_);
       if (*num_tuples > 0) {
@@ -402,12 +402,13 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, 
bool past_scan_range) {
       boundary_row_.Clear();
       num_tuples_materialized = WriteTemplateTuples(tuple_row_mem, 
*num_tuples);
     }
+    COUNTER_ADD(scan_node_->rows_read_counter(), *num_tuples);
 
     // Save contents that are split across buffers if we are going to return 
this column
     if (col_start != byte_buffer_ptr_ && 
delimited_text_parser_->ReturnCurrentColumn()) {
       DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_);
       RETURN_IF_ERROR(boundary_column_.Append(col_start, byte_buffer_ptr_ - 
col_start));
-      char* last_row = NULL;
+      char* last_row = nullptr;
       if (*num_tuples == 0) {
         last_row = batch_start_ptr_;
       } else {
@@ -415,25 +416,65 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, 
bool past_scan_range) {
       }
       RETURN_IF_ERROR(boundary_row_.Append(last_row, byte_buffer_ptr_ - 
last_row));
     }
-    COUNTER_ADD(scan_node_->rows_read_counter(), *num_tuples);
+    RETURN_IF_ERROR(CommitRows(num_tuples_materialized, false, row_batch));
 
-    // Commit the rows to the row batch and scan node
-    RETURN_IF_ERROR(CommitRows(num_tuples_materialized));
+    // Already past the scan range and attempting to complete the last row.
+    if (scan_state_ == PAST_SCAN_RANGE) break;
 
-    // Done with this buffer and the scan range
-    if ((byte_buffer_ptr_ == byte_buffer_end_ && eosr) || past_scan_range) {
+    // Scan range is done. Transition to PAST_SCAN_RANGE.
+    if (byte_buffer_ptr_ == byte_buffer_end_ && eosr) {
+      scan_state_ = PAST_SCAN_RANGE;
       break;
     }
 
-    if (scan_node_->ReachedLimit()) return Status::OK();
+    if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break;
   }
   return Status::OK();
 }
 
-Status HdfsTextScanner::FillByteBuffer(bool* eosr, int num_bytes) {
+Status HdfsTextScanner::GetNextInternal(RowBatch* row_batch) {
+  DCHECK(!eos_);
+  DCHECK_GE(scan_state_, SCAN_RANGE_INITIALIZED);
+  DCHECK_NE(scan_state_, DONE);
+
+  if (scan_state_ == SCAN_RANGE_INITIALIZED) {
+    // Find the first tuple.  If tuple_found is false, it means we went 
through the entire
+    // scan range without finding a single tuple.  The bytes will be picked up 
by the scan
+    // range before.
+    RETURN_IF_ERROR(FindFirstTuple(row_batch->tuple_data_pool()));
+    if (scan_state_ != FIRST_TUPLE_FOUND) {
+      eos_ = true;
+      scan_state_ = DONE;
+      return Status::OK();
+    }
+  }
+
+  int64_t tuple_buffer_size;
+  RETURN_IF_ERROR(
+      row_batch->ResizeAndAllocateTupleBuffer(state_, &tuple_buffer_size, 
&tuple_mem_));
+  tuple_ = reinterpret_cast<Tuple*>(tuple_mem_);
+
+  if (scan_state_ == FIRST_TUPLE_FOUND) {
+    int num_tuples;
+    RETURN_IF_ERROR(ProcessRange(row_batch, &num_tuples));
+  }
+  if (scan_node_->ReachedLimit()) {
+    eos_ = true;
+    scan_state_ = DONE;
+    return Status::OK();
+  }
+  if (scan_state_ == PAST_SCAN_RANGE && !row_batch->AtCapacity()) {
+    RETURN_IF_ERROR(FinishScanRange(row_batch));
+    DCHECK_EQ(scan_state_, DONE);
+    eos_ = true;
+  }
+  return Status::OK();
+}
+
+Status HdfsTextScanner::FillByteBuffer(MemPool* pool, bool* eosr, int 
num_bytes) {
   *eosr = false;
 
-  if (decompressor_.get() == NULL) {
+  if (decompressor_.get() == nullptr) {
     Status status;
     if (num_bytes > 0) {
       stream_->GetBytes(num_bytes, 
reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
@@ -447,7 +488,7 @@ Status HdfsTextScanner::FillByteBuffer(bool* eosr, int 
num_bytes) {
     *eosr = stream_->eosr();
   } else if (decompressor_->supports_streaming()) {
     DCHECK_EQ(num_bytes, 0);
-    RETURN_IF_ERROR(FillByteBufferCompressedStream(eosr));
+    RETURN_IF_ERROR(FillByteBufferCompressedStream(pool, eosr));
   } else {
     DCHECK_EQ(num_bytes, 0);
     RETURN_IF_ERROR(FillByteBufferCompressedFile(eosr));
@@ -463,7 +504,7 @@ Status HdfsTextScanner::DecompressBufferStream(int64_t 
bytes_to_read,
   // decompress buffers that are read from stream_, so we don't need to read 
the
   // whole file in once. A compressed buffer is passed to ProcessBlockStreaming
   // but it may not consume all of the input.
-  uint8_t* compressed_buffer_ptr = NULL;
+  uint8_t* compressed_buffer_ptr = nullptr;
   int64_t compressed_buffer_size = 0;
   // We don't know how many bytes ProcessBlockStreaming() will consume so we 
set
   // peek=true and then later advance the stream using SkipBytes().
@@ -512,15 +553,18 @@ Status HdfsTextScanner::DecompressBufferStream(int64_t 
bytes_to_read,
   return Status::OK();
 }
 
-Status HdfsTextScanner::FillByteBufferCompressedStream(bool* eosr) {
-  // We're about to create a new decompression buffer (if we can't reuse). 
It's now
-  // safe to attach the current decompression buffer to batch_ because we know 
that
-  // it's the last row-batch that can possibly reference this buffer.
+Status HdfsTextScanner::FillByteBufferCompressedStream(MemPool* pool, bool* 
eosr) {
+  // We're about to create a new decompression buffer (if we can't reuse). 
Attach the
+  // memory from previous decompression rounds to 'pool'.
   if (!decompressor_->reuse_output_buffer()) {
-    RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), false));
+    if (pool != nullptr) {
+      pool->AcquireData(data_buffer_pool_.get(), false);
+    } else {
+      data_buffer_pool_->FreeAll();
+    }
   }
 
-  uint8_t* decompressed_buffer = NULL;
+  uint8_t* decompressed_buffer = nullptr;
   int64_t decompressed_len = 0;
   // Set bytes_to_read = -1 because we don't know how much data decompressor 
need.
   // Just read the first available buffer within the scan range.
@@ -543,7 +587,7 @@ Status 
HdfsTextScanner::FillByteBufferCompressedStream(bool* eosr) {
 
   if (*eosr) {
     DCHECK(stream_->eosr());
-    context_->ReleaseCompletedResources(NULL, true);
+    context_->ReleaseCompletedResources(nullptr, true);
   }
 
   return Status::OK();
@@ -579,7 +623,7 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* 
eosr) {
 
   // Decompress and adjust the byte_buffer_ptr_ and byte_buffer_read_size_ 
accordingly.
   int64_t decompressed_len = 0;
-  uint8_t* decompressed_buffer = NULL;
+  uint8_t* decompressed_buffer = nullptr;
   SCOPED_TIMER(decompress_timer_);
   // TODO: Once the writers are in, add tests with very large compressed files 
(4GB)
   // that could overflow.
@@ -588,7 +632,7 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* 
eosr) {
       &decompressed_buffer));
 
   // Inform 'stream_' that the buffer with the compressed text can be released.
-  context_->ReleaseCompletedResources(NULL, true);
+  context_->ReleaseCompletedResources(nullptr, true);
 
   VLOG_FILE << "Decompressed " << byte_buffer_read_size_ << " to " << 
decompressed_len;
   byte_buffer_ptr_ = reinterpret_cast<char*>(decompressed_buffer);
@@ -597,20 +641,22 @@ Status 
HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) {
   return Status::OK();
 }
 
-Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
-  *tuple_found = true;
+Status HdfsTextScanner::FindFirstTuple(MemPool* pool) {
+  DCHECK_EQ(scan_state_, SCAN_RANGE_INITIALIZED);
+
   // Either we're at the start of the file and thus skip all header lines, or 
we're in the
   // middle of the file and look for the next tuple.
+  bool tuple_found = true;
   int num_rows_to_skip = stream_->scan_range()->offset() == 0
       ? scan_node_->skip_header_line_count() : 1;
   if (num_rows_to_skip > 0) {
     int num_skipped_rows = 0;
-    *tuple_found = false;
     bool eosr = false;
+    tuple_found = false;
     // Offset maybe not point to a tuple boundary, skip ahead to the first 
tuple start in
     // this scan range (if one exists).
     do {
-      RETURN_IF_ERROR(FillByteBuffer(&eosr));
+      RETURN_IF_ERROR(FillByteBuffer(nullptr, &eosr));
 
       delimited_text_parser_->ParserReset();
       SCOPED_TIMER(parse_delimiter_timer_);
@@ -624,23 +670,23 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) 
{
         bytes_left -= next_tuple_offset;
         ++num_skipped_rows;
       }
-      if (next_tuple_offset != -1) *tuple_found = true;
-    } while (!*tuple_found && !eosr);
+      if (next_tuple_offset != -1) tuple_found = true;
+    } while (!tuple_found && !eosr);
 
     // Special case: if the first delimiter is at the end of the current 
buffer, it's
     // possible it's a split "\r\n" delimiter.
-    if (*tuple_found && byte_buffer_ptr_ == byte_buffer_end_) {
+    if (tuple_found && byte_buffer_ptr_ == byte_buffer_end_) {
       bool split_delimiter;
       RETURN_IF_ERROR(CheckForSplitDelimiter(&split_delimiter));
       if (split_delimiter) {
         if (eosr) {
           // Split delimiter at the end of the scan range. The next tuple is 
considered
           // part of the next scan range, so we report no tuple found.
-          *tuple_found = false;
+          tuple_found = false;
         } else {
           // Split delimiter at the end of the current buffer, but not eosr. 
Advance to
           // the correct position in the next buffer.
-          RETURN_IF_ERROR(FillByteBuffer(&eosr));
+          RETURN_IF_ERROR(FillByteBuffer(pool, &eosr));
           DCHECK_GT(byte_buffer_read_size_, 0);
           DCHECK_EQ(*byte_buffer_ptr_, '\n');
           byte_buffer_ptr_ += 1;
@@ -648,7 +694,7 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
       }
     }
     if (num_rows_to_skip > 1 && num_skipped_rows != num_rows_to_skip) {
-      DCHECK(!*tuple_found);
+      DCHECK(!tuple_found);
       stringstream ss;
       ss << "Could only skip " << num_skipped_rows << " header lines in first 
scan range "
          << "but expected " << num_rows_to_skip << ". Try increasing "
@@ -656,6 +702,7 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) {
       return Status(ss.str());
     }
   }
+  if (tuple_found) scan_state_ = FIRST_TUPLE_FOUND;
   DCHECK(delimited_text_parser_->AtTupleStart());
   return Status::OK();
 }
@@ -695,17 +742,17 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* 
split_delimiter) {
 // is then injected into the cross-compiled driving function, 
WriteAlignedTuples().
 Status HdfsTextScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ExprContext*>& conjunct_ctxs, Function** 
write_aligned_tuples_fn) {
-  *write_aligned_tuples_fn = NULL;
+  *write_aligned_tuples_fn = nullptr;
   DCHECK(node->runtime_state()->ShouldCodegen());
   LlvmCodeGen* codegen = node->runtime_state()->codegen();
-  DCHECK(codegen != NULL);
+  DCHECK(codegen != nullptr);
   Function* write_complete_tuple_fn;
   RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs,
       &write_complete_tuple_fn));
-  DCHECK(write_complete_tuple_fn != NULL);
+  DCHECK(write_complete_tuple_fn != nullptr);
   RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, 
write_complete_tuple_fn,
       write_aligned_tuples_fn));
-  DCHECK(*write_aligned_tuples_fn != NULL);
+  DCHECK(*write_aligned_tuples_fn != nullptr);
   return Status::OK();
 }
 
@@ -721,22 +768,21 @@ Status HdfsTextScanner::Open(ScannerContext* context) {
   field_locations_.resize(state_->batch_size() * 
scan_node_->materialized_slots().size());
   row_end_locations_.resize(state_->batch_size());
 
-  // Allocate a new row batch. May fail if mem limit is exceeded.
-  RETURN_IF_ERROR(StartNewRowBatch());
+  // Reset state for new scan range
+  RETURN_IF_ERROR(InitNewRange());
   return Status::OK();
 }
 
-// This function writes fields in 'field_locations_' to the row_batch.  This 
function
-// deals with tuples that straddle batches.  There are two cases:
+// This function deals with tuples that straddle batches. There are two cases:
 // 1. There is already a partial tuple in flight from the previous time around.
-//   This tuple can either be fully materialized (all the materialized columns 
have
-//   been processed but we haven't seen the tuple delimiter yet) or only 
partially
-//   materialized.  In this case num_tuples can be greater than num_fields
+//    This tuple can either be fully materialized (all the materialized 
columns have
+//    been processed but we haven't seen the tuple delimiter yet) or only 
partially
+//    materialized.  In this case num_tuples can be greater than num_fields
 // 2. There is a non-fully materialized tuple at the end.  The cols that have 
been
-//   parsed so far are written to 'tuple_' and the remained will be picked up 
(case 1)
-//   the next time around.
-int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row,
-    int num_fields, int num_tuples) {
+//    parsed so far are written to 'tuple_' and the remaining ones will be 
picked up
+//    (case 1) the next time around.
+int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool,
+    TupleRow* row) {
   SCOPED_TIMER(scan_node_->materialize_tuple_timer());
 
   FieldLocation* fields = &field_locations_[0];
@@ -745,7 +791,7 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* 
tuple_row,
   int num_tuples_materialized = 0;
   // Write remaining fields, if any, from the previous partial tuple.
   if (slot_idx_ != 0) {
-    DCHECK(tuple_ != NULL);
+    DCHECK(tuple_ != nullptr);
     int num_partial_fields = scan_node_->materialized_slots().size() - 
slot_idx_;
     // Corner case where there will be no materialized tuples but at least one 
col
     // worth of string data.  In this case, make a deep copy and reuse the 
byte buffer.
@@ -770,16 +816,16 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* 
tuple_row,
 
       memcpy(tuple_, partial_tuple_, scan_node_->tuple_desc()->byte_size());
       partial_tuple_empty_ = true;
-      tuple_row->SetTuple(scan_node_->tuple_idx(), tuple_);
+      row->SetTuple(scan_node_->tuple_idx(), tuple_);
 
       slot_idx_ = 0;
       ++num_tuples_processed;
       --num_tuples;
 
-      if (EvalConjuncts(tuple_row)) {
+      if (EvalConjuncts(row)) {
         ++num_tuples_materialized;
         tuple_ = next_tuple(tuple_byte_size_, tuple_);
-        tuple_row = next_row(tuple_row);
+        row = next_row(row);
       }
     }
 
@@ -793,18 +839,18 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* 
tuple_row,
         num_tuples : scan_node_->limit() - scan_node_->rows_returned();
     int tuples_returned = 0;
     // Call jitted function if possible
-    if (write_tuples_fn_ != NULL) {
+    if (write_tuples_fn_ != nullptr) {
       // HdfsScanner::InitializeWriteTuplesFn() will skip codegen if there are 
string
       // slots and escape characters. TextConverter::WriteSlot() will be used 
instead.
       DCHECK(scan_node_->tuple_desc()->string_slots().empty() ||
           delimited_text_parser_->escape_char() == '\0');
-      tuples_returned = write_tuples_fn_(this, pool, tuple_row,
-          batch_->row_byte_size(), fields, num_tuples, max_added_tuples,
-          scan_node_->materialized_slots().size(), num_tuples_processed);
+      tuples_returned = write_tuples_fn_(this, pool, row, sizeof(Tuple*), 
fields,
+          num_tuples, max_added_tuples, 
scan_node_->materialized_slots().size(),
+          num_tuples_processed);
     } else {
-      tuples_returned = WriteAlignedTuples(pool, tuple_row,
-          batch_->row_byte_size(), fields, num_tuples, max_added_tuples,
-          scan_node_->materialized_slots().size(), num_tuples_processed);
+      tuples_returned = WriteAlignedTuples(pool, row, sizeof(Tuple*), fields,
+          num_tuples, max_added_tuples, 
scan_node_->materialized_slots().size(),
+          num_tuples_processed);
     }
     if (tuples_returned == -1) return 0;
     DCHECK_EQ(slot_idx_, 0);
@@ -819,7 +865,7 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* 
tuple_row,
 
   // Write out the remaining slots (resulting in a partially materialized 
tuple)
   if (num_fields != 0) {
-    DCHECK(tuple_ != NULL);
+    DCHECK(tuple_ != nullptr);
     InitTuple(template_tuple_, partial_tuple_);
     // If there have been no materialized tuples at this point, copy string 
data
     // out of byte_buffer and reuse the byte_buffer.  The copied data can be at
@@ -836,7 +882,7 @@ Status HdfsTextScanner::CopyBoundaryField(FieldLocation* 
data, MemPool* pool) {
   int copy_len = needs_escape ? -data->len : data->len;
   int64_t total_len = copy_len + boundary_column_.len();
   char* str_data = reinterpret_cast<char*>(pool->TryAllocate(total_len));
-  if (UNLIKELY(str_data == NULL)) {
+  if (UNLIKELY(str_data == nullptr)) {
     string details = Substitute("HdfsTextScanner::CopyBoundaryField() failed 
to allocate "
         "$0 bytes.", total_len);
     return pool->mem_tracker()->MemLimitExceeded(state_, details, total_len);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index e68d45f..042cd18 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -36,7 +36,7 @@ struct HdfsFileDesc;
 /// This scanner handles text files split across multiple blocks/scan ranges. 
Note that
 /// the split can occur anywhere in the file, e.g. in the middle of a row. 
Each scanner
 /// starts materializing tuples right after the first row delimiter found in 
the scan
-/// range, and stops at the first row delimiter occuring past the end of the 
scan
+/// range, and stops at the first row delimiter occurring past the end of the 
scan
 /// range. If no delimiter is found in the scan range, the scanner doesn't 
materialize
 /// anything. This scheme ensures that every row is materialized by exactly 
one scanner.
 ///
@@ -57,10 +57,10 @@ class HdfsTextScanner : public HdfsScanner {
 
   /// Issue io manager byte ranges for 'files'.
   static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
-                                   const std::vector<HdfsFileDesc*>& files);
+      const std::vector<HdfsFileDesc*>& files);
 
   /// Codegen WriteAlignedTuples(). Stores the resulting function in
-  /// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise.
+  /// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
   static Status Codegen(HdfsScanNodeBase* node,
       const std::vector<ExprContext*>& conjunct_ctxs,
       llvm::Function** write_aligned_tuples_fn);
@@ -71,6 +71,8 @@ class HdfsTextScanner : public HdfsScanner {
   static const char* LLVM_CLASS_NAME;
 
  protected:
+  virtual Status GetNextInternal(RowBatch* row_batch);
+
   /// Reset the scanner.  This clears any partial state that needs to
   /// be cleared when starting or when restarting after an error.
   Status ResetScanner();
@@ -90,33 +92,52 @@ class HdfsTextScanner : public HdfsScanner {
  private:
   const static int NEXT_BLOCK_READ_SIZE = 64 * 1024; //bytes
 
+  /// The text scanner transitions through these states exactly in order.
+  enum TextScanState {
+    CONSTRUCTED,
+    SCAN_RANGE_INITIALIZED,
+    FIRST_TUPLE_FOUND,
+    PAST_SCAN_RANGE,
+    DONE
+  };
+
   /// Initializes this scanner for this context.  The context maps to a single
-  /// scan range.
+  /// scan range. Advances the scan state to SCAN_RANGE_INITIALIZED.
   virtual Status InitNewRange();
 
   /// Finds the start of the first tuple in this scan range and initializes
-  /// byte_buffer_ptr to be the next character (the start of the first tuple). 
 If
-  /// there are no tuples starts in the entire range, *tuple_found is set to 
false
-  /// and no more processing needs to be done in this range (i.e. there are 
really large
-  /// columns)
-  Status FindFirstTuple(bool* tuple_found);
-
-  /// Process the entire scan range, reading bytes from context and appending
-  /// materialized row batches to the scan node.  *num_tuples returns the
-  /// number of tuples parsed.  past_scan_range is true if this is processing
-  /// beyond the end of the scan range and this function should stop after
-  /// finding one tuple.
-  Status ProcessRange(int* num_tuples, bool past_scan_range);
-
-  /// Reads past the end of the scan range for the next tuple end.
-  Status FinishScanRange();
+  /// 'byte_buffer_ptr_' to point to the start of first tuple. Advances the 
scan state
+  /// to FIRST_TUPLE_FOUND, if successful. Otherwise, consumes the whole scan 
range
+  /// and does not update the scan state (e.g. if there are really large 
columns).
+  /// Only valid to call in scan state SCAN_RANGE_INITIALIZED.
+  Status FindFirstTuple(MemPool* pool);
+
+  /// When in scan state FIRST_TUPLE_FOUND, starts or continues processing the 
scan range
+  /// by reading bytes from 'context_'. Adds materialized tuples that pass the 
conjuncts
+  /// to 'row_batch', and returns when 'row_batch' is at capacity.
+  /// When in scan state PAST_SCAN_RANGE, this function returns after parsing 
one tuple,
+  /// regardless of whether it passed the conjuncts.
+  /// *num_tuples returns the total number of tuples parsed, including tuples 
that did
+  /// not pass conjuncts.
+  /// Advances the scan state to PAST_SCAN_RANGE if all bytes in the scan 
range have been
+  /// processed.
+  /// Only valid to call in scan state FIRST_TUPLE_FOUND or PAST_SCAN_RANGE.
+  Status ProcessRange(RowBatch* row_batch, int* num_tuples);
+
+  /// Reads past the end of the scan range for the next tuple end. If 
successful,
+  /// advances the scan state to DONE. Only valid to call in state 
PAST_SCAN_RANGE.
+  Status FinishScanRange(RowBatch* row_batch);
 
   /// Fills the next byte buffer from the context.  This will block if there 
are no bytes
   /// ready.  Updates byte_buffer_ptr_, byte_buffer_end_ and 
byte_buffer_read_size_.
   /// If num_bytes is 0, the scanner will read whatever is the io mgr buffer 
size,
   /// otherwise it will just read num_bytes. If we are reading compressed 
text, num_bytes
-  /// must be 0.
-  virtual Status FillByteBuffer(bool* eosr, int num_bytes = 0);
+  /// must be 0. Internally, calls the appropriate streaming or non-streaming
+  /// decompression functions FillByteBufferCompressedFile/Stream().
+  /// If applicable, attaches decompression buffers from previous calls that 
might still
+  /// be referenced by returned batches to 'pool'. If 'pool' is nullptr the 
buffers are
+  /// freed instead.
+  virtual Status FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes = 0);
 
   /// Fills the next byte buffer from the compressed data in stream_ by 
reading the entire
   /// file, decompressing it, and setting the byte_buffer_ptr_ to the 
decompressed buffer.
@@ -126,7 +147,9 @@ class HdfsTextScanner : public HdfsScanner {
   /// FillByteBufferCompressedFile(), the entire file does not need to be read 
at once.
   /// Buffers from stream_ are decompressed as they are read and 
byte_buffer_ptr_ is set
   /// to available decompressed data.
-  Status FillByteBufferCompressedStream(bool* eosr);
+  /// Attaches decompression buffers from previous calls that might still be 
referenced
+  /// by returned batches to 'pool'. If 'pool' is nullptr the buffers are 
freed instead.
+  Status FillByteBufferCompressedStream(MemPool* pool, bool* eosr);
 
   /// Used by FillByteBufferCompressedStream() to decompress data from 
'stream_'.
   /// Returns COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS if it needs more input.
@@ -150,21 +173,24 @@ class HdfsTextScanner : public HdfsScanner {
   /// memory limit is exceeded when allocating a new string.
   Status CopyBoundaryField(FieldLocation* data, MemPool* pool);
 
-  /// Writes the intermediate parsed data into slots, outputting
-  /// tuples to row_batch as they complete.
+  /// Writes intermediate parsed data into 'tuple_', evaluates conjuncts, and 
appends
+  /// surviving rows to 'row'. Advances 'tuple_' and 'row' as necessary.
   /// Input Parameters:
-  ///  mempool: MemPool to allocate from for field data
   ///  num_fields: Total number of fields contained in parsed_data_
   ///  num_tuples: Number of tuples in parsed_data_. This includes the 
potential
   ///    partial tuple at the beginning of 'field_locations_'.
-  /// Returns the number of tuples added to the row batch.
-  int WriteFields(MemPool*, TupleRow* tuple_row_mem, int num_fields, int 
num_tuples);
+  ///  pool: MemPool to allocate from for field data
+  /// Returns the number of rows added to the row batch.
+  int WriteFields(int num_fields, int num_tuples, MemPool* pool, TupleRow* 
row);
 
   /// Utility function to write out 'num_fields' to 'tuple_'.  This is used to 
parse
   /// partial tuples. If copy_strings is true, strings from fields will be 
copied into
   /// the boundary pool.
   void WritePartialTuple(FieldLocation*, int num_fields, bool copy_strings);
 
+  /// Current state of this scanner. Advances through the states exactly in 
order.
+  TextScanState scan_state_;
+
   /// Mem pool for boundary_row_ and boundary_column_.
   boost::scoped_ptr<MemPool> boundary_pool_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 66f112d..335c921 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -92,7 +92,7 @@ ScannerContext::Stream* 
ScannerContext::AddStream(DiskIoMgr::ScanRange* range) {
 }
 
 void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool 
done) {
-  DCHECK(batch != nullptr || done);
+  DCHECK(batch != nullptr || done || !contains_tuple_data_);
   if (done) {
     // Mark any pending resources as completed
     if (io_buffer_ != nullptr) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 0f4e36f..ff3cfa8 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -257,8 +257,9 @@ class ScannerContext {
     /// If 'batch' is not NULL and 'contains_tuple_data_' is true, attaches 
all completed
     /// io buffers and the boundary mem pool to 'batch'. If 'done' is set, all 
in-flight
     /// resources are also attached or released.
-    /// If 'batch' is NULL then 'done' must be true. Such a call will release 
all
-    /// completed and in-flight resources.
+    /// If 'batch' is NULL then 'done' must be true or 'contains_tuple_data_' 
false. Such
+    /// a call will release all completed resources. If 'done' is true all 
in-flight
+    /// resources are also freed.
     void ReleaseCompletedResources(RowBatch* batch, bool done);
 
     /// Error-reporting functions.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 50d6bfe..cb6627f 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -221,7 +221,9 @@ public class HdfsScanNode extends ScanNode {
     // is currently only supported for Parquet.
     if (analyzer.getQueryOptions().isSetMt_dop() &&
         analyzer.getQueryOptions().mt_dop > 0 &&
-        fileFormats.size() == 1 && 
fileFormats.contains(HdfsFileFormat.PARQUET)) {
+        fileFormats.size() == 1 &&
+        (fileFormats.contains(HdfsFileFormat.PARQUET)
+          || fileFormats.contains(HdfsFileFormat.TEXT))) {
       useMtScanNode_ = true;
     } else {
       useMtScanNode_ = false;

Reply via email to