IMPALA-4893: Efficiently update the rows read counter for sequence file Update the rows read counter after processing the scan range instead of updating it after reading every row for sequence files to save CPU cycles.
Change-Id: Ie42c97a36e46172884cc497aa645036c2c11f541 Reviewed-on: http://gerrit.cloudera.org:8080/6522 Reviewed-by: Bharath Vissapragada <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5809317c Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5809317c Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5809317c Branch: refs/heads/master Commit: 5809317c9a202159ac28e0565d04151eae843d09 Parents: 59b2db6 Author: aphadke <[email protected]> Authored: Thu Mar 30 18:05:52 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Apr 26 01:12:01 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-sequence-scanner.cc | 7 ++++--- .../queries/QueryTest/hdfs_scanner_profile.test | 8 ++++++++ tests/query_test/test_scanners.py | 7 +++++++ 3 files changed, 19 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5809317c/be/src/exec/hdfs-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc index d802bd7..275f96b 100644 --- a/be/src/exec/hdfs-sequence-scanner.cc +++ b/be/src/exec/hdfs-sequence-scanner.cc @@ -294,6 +294,7 @@ Status HdfsSequenceScanner::ProcessRange() { // We count the time here since there is too much overhead to do // this on each record. SCOPED_TIMER(scan_node_->materialize_tuple_timer()); + int64_t num_rows_read = 0; while (!finished()) { DCHECK_GT(record_locations_.size(), 0); @@ -336,13 +337,12 @@ Status HdfsSequenceScanner::ProcessRange() { } else { add_row = WriteTemplateTuples(tuple_row_mem, 1); } - - COUNTER_ADD(scan_node_->rows_read_counter(), 1); + num_rows_read++; if (add_row) RETURN_IF_ERROR(CommitRows(1)); if (scan_node_->ReachedLimit()) break; // Sequence files don't end with syncs - if (stream_->eof()) return Status::OK(); + if (stream_->eof()) break; // Check for sync by looking for the marker that precedes syncs. int marker; @@ -353,6 +353,7 @@ Status HdfsSequenceScanner::ProcessRange() { } } + COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5809317c/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test b/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test new file mode 100644 index 0000000..ea459e4 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test @@ -0,0 +1,8 @@ +==== +---- QUERY +# This query will do a full table scan to count the num of rows +# read during a scan +select * from alltypesagg +---- RUNTIME_PROFILE +row_regex: .*RowsRead: 11.00K . +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5809317c/tests/query_test/test_scanners.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 8ba2e0b..b0e2e80 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -68,6 +68,13 @@ class TestScannersAllTableFormats(ImpalaTestSuite): new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size') self.run_test_case('QueryTest/scanners', new_vector) + def test_hdfs_scanner_profile(self, vector): + new_vector = deepcopy(vector) + new_vector.get_value('exec_option')['num_nodes'] = 1 + if new_vector.get_value('table_format').file_format in ('kudu', 'hbase'): + pytest.skip() + self.run_test_case('QueryTest/hdfs_scanner_profile', new_vector) + # Test all the scanners with a simple limit clause. The limit clause triggers # cancellation in the scanner code paths. class TestScannersAllTableFormatsWithLimit(ImpalaTestSuite):
