IMPALA-3662: Don't double allocate tuples buffer in parquet scanner HdfsScanner::StartNewRowBatch() is called once per row batch by the parquet scanner to allocate a new row batch and tuple buffer. Similarly, a scratch batch is created for each row batch in HdfsParquetScanner::AssembleRows() which also contains the tuple buffer. In reality, only the tuple buffer in the scratch batch is used. So, the tuple buffer allocated by HdfsScanner::StartNewRowBatch() is unused memory for the parquet scanner.
This change fixes the problem above by implementing HdfsParquetScanner::StartNewRowBatch() which creates a new row batch without allocating the tuple buffer. With this patch, the memory consumption when materializing very wide tuples is reduced by half. Change-Id: I826061a2be10fd0528ca4dd1e97146e3cb983370 Reviewed-on: http://gerrit.cloudera.org:8080/4064 Reviewed-by: Michael Ho <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1522da35 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1522da35 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1522da35 Branch: refs/heads/master Commit: 1522da3510a36635e3fc694b26211554fcd2793a Parents: 02608f8 Author: Michael Ho <[email protected]> Authored: Thu Aug 18 22:31:34 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Sat Aug 20 03:03:10 2016 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-scanner.cc | 10 ++++++++-- be/src/exec/hdfs-parquet-scanner.h | 5 +++++ be/src/exec/hdfs-scanner.h | 2 +- 3 files changed, 14 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1522da35/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index ee5f4d9..2aaf8de 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -301,13 +301,13 @@ Status HdfsParquetScanner::ProcessSplit() { DCHECK(add_batches_to_queue_); bool scanner_eos = false; do { - RETURN_IF_ERROR(StartNewRowBatch()); + StartNewParquetRowBatch(); RETURN_IF_ERROR(GetNextInternal(batch_, &scanner_eos)); scan_node_->AddMaterializedRowBatch(batch_); } while (!scanner_eos && !scan_node_->ReachedLimit()); // Transfer the remaining resources to this new batch in Close(). - RETURN_IF_ERROR(StartNewRowBatch()); + StartNewParquetRowBatch(); return Status::OK(); } @@ -529,6 +529,12 @@ Status HdfsParquetScanner::AssembleRows( return Status::OK(); } +void HdfsParquetScanner::StartNewParquetRowBatch() { + DCHECK(add_batches_to_queue_); + batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(), + scan_node_->mem_tracker()); +} + Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) { DCHECK(dst_batch != NULL); dst_batch->CommitRows(num_rows); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1522da35/be/src/exec/hdfs-parquet-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h index 1df09a4..dfd7785 100644 --- a/be/src/exec/hdfs-parquet-scanner.h +++ b/be/src/exec/hdfs-parquet-scanner.h @@ -446,6 +446,11 @@ class HdfsParquetScanner : public HdfsScanner { Status AssembleRows(const std::vector<ParquetColumnReader*>& column_readers, RowBatch* row_batch, bool* skip_row_group); + /// Set 'batch_' to a new row batch. Unlike the similarly named function in + /// HdfsScanner, this function will not allocate the tuple buffer. Only valid + /// to call if 'add_batches_to_queue_' is true. + void StartNewParquetRowBatch(); + /// Commit num_rows to the given row batch. /// Returns OK if the query is not cancelled and hasn't exceeded any mem limits. /// Scanner can call this with 0 rows to flush any pending resources (attached pools http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1522da35/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index 53711ab..7ddf0a5 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -282,7 +282,7 @@ class HdfsScanner { Status InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition, THdfsFileFormat::type type, const std::string& scanner_name); - /// Set batch_ to a new row batch and update tuple_mem_ accordingly. + /// Set 'batch_' to a new row batch and update 'tuple_mem_' accordingly. /// Only valid to call if 'add_batches_to_queue_' is true. Status StartNewRowBatch();
