Repository: incubator-impala Updated Branches: refs/heads/master 796db0fce -> 9ce691af2
IMPALA-5210: Count rows and collection items in parquet scanner separately This patch adds collection_items_read_counter in scan node, makes rows_read_counter count top-level rows only, and updates these counters in a less frequent manner. When scanning nested columns, current code counts both top-level rows and nested rows in rows_read_counter, which is inconsistent with rows_returned_counter. Furthermore, rows_read_counter is updated eagerly whenever a batch of collection items are read. As a result it spends around 10% time updating the counter with the following simple query: >select count(*) from > customer c, > c.c_orders o, > o.o_lineitems l >where > c_mktsegment = 'BUILDING' > and o_orderdate < '1995-03-15' > and l_shipdate > '1995-03-15' and o_orderkey = 10; This patch moves collection items counting into collection_items_read_counter. Both counters are updated for every row batch read. In the query described above, scanning time is decreased by 10.4%. Change-Id: I7f6efddaea18507482940f5bdab7326b6482b067 Reviewed-on: http://gerrit.cloudera.org:8080/7776 Reviewed-by: Alex Behm <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/915d30f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/915d30f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/915d30f4 Branch: refs/heads/master Commit: 915d30f4bb6334e9d816e080722484537732fe3d Parents: 796db0f Author: Tianyi Wang <[email protected]> Authored: Thu Aug 24 10:47:28 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Sep 6 00:09:40 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-scanner.cc | 22 ++++++++++++---------- be/src/exec/hdfs-parquet-scanner.h | 9 ++++++++- be/src/exec/scan-node.cc | 3 +++ be/src/exec/scan-node.h | 11 ++++++++++- 4 files changed, 33 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index ecf14f8..03b9c70 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -171,6 +171,7 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState num_row_groups_counter_(NULL), num_scanners_with_no_reads_counter_(NULL), num_dict_filtered_row_groups_counter_(NULL), + coll_items_read_counter_(0), codegend_process_scratch_batch_fn_(NULL) { assemble_rows_timer_.Stop(); } @@ -958,6 +959,7 @@ Status HdfsParquetScanner::AssembleRows( DCHECK_EQ(*skip_row_group, false); DCHECK(scratch_batch_ != NULL); + int64_t num_rows_read = 0; while (!column_readers[0]->RowGroupAtEnd()) { // Start a new scratch batch. RETURN_IF_ERROR(scratch_batch_->Reset(state_)); @@ -965,10 +967,9 @@ Status HdfsParquetScanner::AssembleRows( // Materialize the top-level slots into the scratch batch column-by-column. int last_num_tuples = -1; - int num_col_readers = column_readers.size(); - bool continue_execution = true; - for (int c = 0; c < num_col_readers; ++c) { + for (int c = 0; c < column_readers.size(); ++c) { ParquetColumnReader* col_reader = column_readers[c]; + bool continue_execution; if (col_reader->max_rep_level() > 0) { continue_execution = col_reader->ReadValueBatch(&scratch_batch_->aux_mem_pool, scratch_batch_->capacity, tuple_byte_size_, scratch_batch_->tuple_mem, @@ -997,14 +998,16 @@ Status HdfsParquetScanner::AssembleRows( } last_num_tuples = scratch_batch_->num_tuples; } - row_group_rows_read_ += scratch_batch_->num_tuples; - COUNTER_ADD(scan_node_->rows_read_counter(), scratch_batch_->num_tuples); - + num_rows_read += scratch_batch_->num_tuples; int num_row_to_commit = TransferScratchTuples(row_batch); RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit)); - if (row_batch->AtCapacity()) return Status::OK(); + if (row_batch->AtCapacity()) break; } - + row_group_rows_read_ += num_rows_read; + COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read); + // Merge Scanner-local counter into HdfsScanNode counter and reset. + COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_); + coll_items_read_counter_ = 0; return Status::OK(); } @@ -1265,11 +1268,10 @@ bool HdfsParquetScanner::AssembleCollection( } rows_read += row_idx; - COUNTER_ADD(scan_node_->rows_read_counter(), row_idx); coll_value_builder->CommitTuples(num_to_commit); continue_execution &= !scan_node_->ReachedLimit() && !context_->cancelled(); } - + coll_items_read_counter_ += rows_read; if (end_of_collection) { // All column readers should report the start of the same collection. for (int c = 1; c < column_readers.size(); ++c) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/hdfs-parquet-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h index b31d321..5f67036 100644 --- a/be/src/exec/hdfs-parquet-scanner.h +++ b/be/src/exec/hdfs-parquet-scanner.h @@ -474,6 +474,12 @@ class HdfsParquetScanner : public HdfsScanner { /// Number of row groups skipped due to dictionary filter RuntimeProfile::Counter* num_dict_filtered_row_groups_counter_; + /// Number of collection items read in current row batch. It is a scanner-local counter + /// used to reduce the frequency of updating HdfsScanNode counter. It is updated by the + /// callees of AssembleRows() and is merged into the HdfsScanNode counter at the end of + /// AssembleRows() and then is reset to 0. + int64_t coll_items_read_counter_; + typedef int (*ProcessScratchBatchFn)(HdfsParquetScanner*, RowBatch*); /// The codegen'd version of ProcessScratchBatch() if available, NULL otherwise. ProcessScratchBatchFn codegend_process_scratch_batch_fn_; @@ -545,7 +551,8 @@ class HdfsParquetScanner : public HdfsScanner { WARN_UNUSED_RESULT; /// Reads data using 'column_readers' to materialize the tuples of a CollectionValue - /// allocated from 'coll_value_builder'. + /// allocated from 'coll_value_builder'. Increases 'coll_items_read_counter_' by the + /// number of items in this collection and descendant collections. /// /// 'new_collection_rep_level' indicates when the end of the collection has been /// reached, namely when current_rep_level <= new_collection_rep_level. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc index 938ce6e..0df0c3f 100644 --- a/be/src/exec/scan-node.cc +++ b/be/src/exec/scan-node.cc @@ -28,6 +28,7 @@ namespace impala { // Changing these names have compatibility concerns. const string ScanNode::BYTES_READ_COUNTER = "BytesRead"; const string ScanNode::ROWS_READ_COUNTER = "RowsRead"; +const string ScanNode::COLLECTION_ITEMS_READ_COUNTER = "CollectionItemsRead"; const string ScanNode::TOTAL_HDFS_READ_TIMER = "TotalRawHdfsReadTime(*)"; const string ScanNode::TOTAL_HBASE_READ_TIMER = "TotalRawHBaseReadTime(*)"; const string ScanNode::TOTAL_THROUGHPUT_COUNTER = "TotalReadThroughput"; @@ -58,6 +59,8 @@ Status ScanNode::Prepare(RuntimeState* state) { BYTES_READ_COUNTER, bytes_read_counter_); rows_read_counter_ = ADD_COUNTER(runtime_profile(), ROWS_READ_COUNTER, TUnit::UNIT); + collection_items_read_counter_ = + ADD_COUNTER(runtime_profile(), COLLECTION_ITEMS_READ_COUNTER, TUnit::UNIT); total_throughput_counter_ = runtime_profile()->AddRateCounter( TOTAL_THROUGHPUT_COUNTER, bytes_read_counter_); materialize_tuple_timer_ = ADD_CHILD_TIMER(runtime_profile(), MATERIALIZE_TUPLE_TIMER, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h index 87b9c71..0f73c2b 100644 --- a/be/src/exec/scan-node.h +++ b/be/src/exec/scan-node.h @@ -97,6 +97,9 @@ class ScanNode : public ExecNode { RuntimeProfile::Counter* bytes_read_counter() const { return bytes_read_counter_; } RuntimeProfile::Counter* rows_read_counter() const { return rows_read_counter_; } + RuntimeProfile::Counter* collection_items_read_counter() const { + return collection_items_read_counter_; + } RuntimeProfile::Counter* read_timer() const { return read_timer_; } RuntimeProfile::Counter* total_throughput_counter() const { return total_throughput_counter_; @@ -123,6 +126,7 @@ class ScanNode : public ExecNode { /// names of ScanNode common counters static const std::string BYTES_READ_COUNTER; static const std::string ROWS_READ_COUNTER; + static const std::string COLLECTION_ITEMS_READ_COUNTER; static const std::string TOTAL_HDFS_READ_TIMER; static const std::string TOTAL_HBASE_READ_TIMER; static const std::string TOTAL_THROUGHPUT_COUNTER; @@ -143,8 +147,13 @@ class ScanNode : public ExecNode { RuntimeProfile::Counter* bytes_read_counter_; // # bytes read from the scanner /// Time series of the bytes_read_counter_ RuntimeProfile::TimeSeriesCounter* bytes_read_timeseries_counter_; - /// # rows/tuples read from the scanner (including those discarded by EvalConjucts()) + /// # top-level rows/tuples read from the scanner + /// (including those discarded by EvalConjucts()) RuntimeProfile::Counter* rows_read_counter_; + /// # items the scanner read into CollectionValues. For example, for schema + /// array<struct<B: INT, array<C: INT>> and tuple + /// [(2, [(3)]), (4, [])] this counter will be 3: (2, [(3)]), (3) and (4, []) + RuntimeProfile::Counter* collection_items_read_counter_; RuntimeProfile::Counter* read_timer_; // total read time /// Wall based aggregate read throughput [bytes/sec] RuntimeProfile::Counter* total_throughput_counter_;
