Repository: incubator-impala
Updated Branches:
  refs/heads/master 796db0fce -> 9ce691af2


IMPALA-5210: Count rows and collection items in parquet scanner separately

This patch adds collection_items_read_counter in scan node, makes
rows_read_counter count top-level rows only, and updates these counters
in a less frequent manner.
When scanning nested columns, current code counts both top-level rows
and nested rows in rows_read_counter, which is inconsistent with
rows_returned_counter. Furthermore, rows_read_counter is updated eagerly
whenever a batch of collection items are read. As a result it spends
around 10% time updating the counter with the following simple query:
>select count(*) from
> customer c,
> c.c_orders o,
> o.o_lineitems l
>where
> c_mktsegment = 'BUILDING'
> and o_orderdate < '1995-03-15'
> and l_shipdate > '1995-03-15' and o_orderkey = 10;

This patch moves collection items counting into
collection_items_read_counter. Both counters are updated for every row
batch read. In the query described above, scanning time is decreased by
10.4%.

Change-Id: I7f6efddaea18507482940f5bdab7326b6482b067
Reviewed-on: http://gerrit.cloudera.org:8080/7776
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/915d30f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/915d30f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/915d30f4

Branch: refs/heads/master
Commit: 915d30f4bb6334e9d816e080722484537732fe3d
Parents: 796db0f
Author: Tianyi Wang <[email protected]>
Authored: Thu Aug 24 10:47:28 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Wed Sep 6 00:09:40 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 22 ++++++++++++----------
 be/src/exec/hdfs-parquet-scanner.h  |  9 ++++++++-
 be/src/exec/scan-node.cc            |  3 +++
 be/src/exec/scan-node.h             | 11 ++++++++++-
 4 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index ecf14f8..03b9c70 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -171,6 +171,7 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* 
scan_node, RuntimeState
     num_row_groups_counter_(NULL),
     num_scanners_with_no_reads_counter_(NULL),
     num_dict_filtered_row_groups_counter_(NULL),
+    coll_items_read_counter_(0),
     codegend_process_scratch_batch_fn_(NULL) {
   assemble_rows_timer_.Stop();
 }
@@ -958,6 +959,7 @@ Status HdfsParquetScanner::AssembleRows(
   DCHECK_EQ(*skip_row_group, false);
   DCHECK(scratch_batch_ != NULL);
 
+  int64_t num_rows_read = 0;
   while (!column_readers[0]->RowGroupAtEnd()) {
     // Start a new scratch batch.
     RETURN_IF_ERROR(scratch_batch_->Reset(state_));
@@ -965,10 +967,9 @@ Status HdfsParquetScanner::AssembleRows(
 
     // Materialize the top-level slots into the scratch batch column-by-column.
     int last_num_tuples = -1;
-    int num_col_readers = column_readers.size();
-    bool continue_execution = true;
-    for (int c = 0; c < num_col_readers; ++c) {
+    for (int c = 0; c < column_readers.size(); ++c) {
       ParquetColumnReader* col_reader = column_readers[c];
+      bool continue_execution;
       if (col_reader->max_rep_level() > 0) {
         continue_execution = 
col_reader->ReadValueBatch(&scratch_batch_->aux_mem_pool,
             scratch_batch_->capacity, tuple_byte_size_, 
scratch_batch_->tuple_mem,
@@ -997,14 +998,16 @@ Status HdfsParquetScanner::AssembleRows(
       }
       last_num_tuples = scratch_batch_->num_tuples;
     }
-    row_group_rows_read_ += scratch_batch_->num_tuples;
-    COUNTER_ADD(scan_node_->rows_read_counter(), scratch_batch_->num_tuples);
-
+    num_rows_read += scratch_batch_->num_tuples;
     int num_row_to_commit = TransferScratchTuples(row_batch);
     RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit));
-    if (row_batch->AtCapacity()) return Status::OK();
+    if (row_batch->AtCapacity()) break;
   }
-
+  row_group_rows_read_ += num_rows_read;
+  COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
+  // Merge Scanner-local counter into HdfsScanNode counter and reset.
+  COUNTER_ADD(scan_node_->collection_items_read_counter(), 
coll_items_read_counter_);
+  coll_items_read_counter_ = 0;
   return Status::OK();
 }
 
@@ -1265,11 +1268,10 @@ bool HdfsParquetScanner::AssembleCollection(
     }
 
     rows_read += row_idx;
-    COUNTER_ADD(scan_node_->rows_read_counter(), row_idx);
     coll_value_builder->CommitTuples(num_to_commit);
     continue_execution &= !scan_node_->ReachedLimit() && 
!context_->cancelled();
   }
-
+  coll_items_read_counter_ += rows_read;
   if (end_of_collection) {
     // All column readers should report the start of the same collection.
     for (int c = 1; c < column_readers.size(); ++c) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h 
b/be/src/exec/hdfs-parquet-scanner.h
index b31d321..5f67036 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -474,6 +474,12 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Number of row groups skipped due to dictionary filter
   RuntimeProfile::Counter* num_dict_filtered_row_groups_counter_;
 
+  /// Number of collection items read in current row batch. It is a 
scanner-local counter
+  /// used to reduce the frequency of updating HdfsScanNode counter. It is 
updated by the
+  /// callees of AssembleRows() and is merged into the HdfsScanNode counter at 
the end of
+  /// AssembleRows() and then is reset to 0.
+  int64_t coll_items_read_counter_;
+
   typedef int (*ProcessScratchBatchFn)(HdfsParquetScanner*, RowBatch*);
   /// The codegen'd version of ProcessScratchBatch() if available, NULL 
otherwise.
   ProcessScratchBatchFn codegend_process_scratch_batch_fn_;
@@ -545,7 +551,8 @@ class HdfsParquetScanner : public HdfsScanner {
       WARN_UNUSED_RESULT;
 
   /// Reads data using 'column_readers' to materialize the tuples of a 
CollectionValue
-  /// allocated from 'coll_value_builder'.
+  /// allocated from 'coll_value_builder'. Increases 
'coll_items_read_counter_' by the
+  /// number of items in this collection and descendant collections.
   ///
   /// 'new_collection_rep_level' indicates when the end of the collection has 
been
   /// reached, namely when current_rep_level <= new_collection_rep_level.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 938ce6e..0df0c3f 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -28,6 +28,7 @@ namespace impala {
 // Changing these names have compatibility concerns.
 const string ScanNode::BYTES_READ_COUNTER = "BytesRead";
 const string ScanNode::ROWS_READ_COUNTER = "RowsRead";
+const string ScanNode::COLLECTION_ITEMS_READ_COUNTER = "CollectionItemsRead";
 const string ScanNode::TOTAL_HDFS_READ_TIMER = "TotalRawHdfsReadTime(*)";
 const string ScanNode::TOTAL_HBASE_READ_TIMER = "TotalRawHBaseReadTime(*)";
 const string ScanNode::TOTAL_THROUGHPUT_COUNTER = "TotalReadThroughput";
@@ -58,6 +59,8 @@ Status ScanNode::Prepare(RuntimeState* state) {
       BYTES_READ_COUNTER, bytes_read_counter_);
   rows_read_counter_ =
       ADD_COUNTER(runtime_profile(), ROWS_READ_COUNTER, TUnit::UNIT);
+  collection_items_read_counter_ =
+      ADD_COUNTER(runtime_profile(), COLLECTION_ITEMS_READ_COUNTER, 
TUnit::UNIT);
   total_throughput_counter_ = runtime_profile()->AddRateCounter(
       TOTAL_THROUGHPUT_COUNTER, bytes_read_counter_);
   materialize_tuple_timer_ = ADD_CHILD_TIMER(runtime_profile(), 
MATERIALIZE_TUPLE_TIMER,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/915d30f4/be/src/exec/scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 87b9c71..0f73c2b 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -97,6 +97,9 @@ class ScanNode : public ExecNode {
 
   RuntimeProfile::Counter* bytes_read_counter() const { return 
bytes_read_counter_; }
   RuntimeProfile::Counter* rows_read_counter() const { return 
rows_read_counter_; }
+  RuntimeProfile::Counter* collection_items_read_counter() const {
+    return collection_items_read_counter_;
+  }
   RuntimeProfile::Counter* read_timer() const { return read_timer_; }
   RuntimeProfile::Counter* total_throughput_counter() const {
     return total_throughput_counter_;
@@ -123,6 +126,7 @@ class ScanNode : public ExecNode {
   /// names of ScanNode common counters
   static const std::string BYTES_READ_COUNTER;
   static const std::string ROWS_READ_COUNTER;
+  static const std::string COLLECTION_ITEMS_READ_COUNTER;
   static const std::string TOTAL_HDFS_READ_TIMER;
   static const std::string TOTAL_HBASE_READ_TIMER;
   static const std::string TOTAL_THROUGHPUT_COUNTER;
@@ -143,8 +147,13 @@ class ScanNode : public ExecNode {
   RuntimeProfile::Counter* bytes_read_counter_; // # bytes read from the 
scanner
   /// Time series of the bytes_read_counter_
   RuntimeProfile::TimeSeriesCounter* bytes_read_timeseries_counter_;
-  /// # rows/tuples read from the scanner (including those discarded by 
EvalConjucts())
+  /// # top-level rows/tuples read from the scanner
+  /// (including those discarded by EvalConjucts())
   RuntimeProfile::Counter* rows_read_counter_;
+  /// # items the scanner read into CollectionValues. For example, for schema
+  /// array<struct<B: INT, array<C: INT>> and tuple
+  /// [(2, [(3)]), (4, [])] this counter will be 3: (2, [(3)]), (3) and (4, [])
+  RuntimeProfile::Counter* collection_items_read_counter_;
   RuntimeProfile::Counter* read_timer_; // total read time
   /// Wall based aggregate read throughput [bytes/sec]
   RuntimeProfile::Counter* total_throughput_counter_;

Reply via email to