IMPALA-3002/IMPALA-1473: Cardinality observability cleanup

IMPALA-3002:
The shell prints an incorrect value for '#Rows' in the exec
summary for broadcast nodes due to incorrect logic around
whether to use max or agg stats. This patch makes the behavior
consistent with the way the be treats exec summaries in
summary-util.cc. This incorrect logic was also duplicated in
the impala_beeswax test framework.

IMPALA-1473:
When there is a merging exchange with a limit, we may copy rows
into the output batch beyond the limit. In this case, we currently
update the output batch's size to reflect the limit, but we also
need to update ExecNode::num_rows_returned_ or the exec summary
may show that the exchange node returned more rows than it really
did.

Additionally, PlanFragmentExecutor::GetNext does not update
rows_produced_counter_ in some cases, leading the runtime profile
to display an incorrect value for 'RowsProduced'.

Change-Id: I386719370386c9cff09b8b35d15dc712dc6480aa
Reviewed-on: http://gerrit.cloudera.org:8080/4679
Reviewed-by: Matthew Jacobs <m...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7fad3e5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7fad3e5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7fad3e5d

Branch: refs/heads/hadoop-next
Commit: 7fad3e5dc38c1097db6be24da0cda6941f554150
Parents: a1c9cb3
Author: Thomas Tauber-Marshall <tmarsh...@cloudera.com>
Authored: Mon Oct 10 10:32:55 2016 -0700
Committer: Internal Jenkins <cloudera-hud...@gerrit.cloudera.org>
Committed: Sat Oct 15 01:25:51 2016 +0000

----------------------------------------------------------------------
 be/src/exec/exchange-node.cc             |  1 +
 be/src/runtime/plan-fragment-executor.cc |  2 +-
 shell/impala_client.py                   |  5 ++-
 tests/beeswax/impala_beeswax.py          | 10 +++---
 tests/query_test/test_observability.py   | 52 +++++++++++++++++++++++++++
 5 files changed, 63 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7fad3e5d/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 22dfe40..833949b 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -207,6 +207,7 @@ Status ExchangeNode::GetNextMerging(RuntimeState* state, 
RowBatch* output_batch,
   num_rows_returned_ += output_batch->num_rows();
   if (ReachedLimit()) {
     output_batch->set_num_rows(output_batch->num_rows() - (num_rows_returned_ 
- limit_));
+    num_rows_returned_ = limit_;
     *eos = true;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7fad3e5d/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc 
b/be/src/runtime/plan-fragment-executor.cc
index e0d314b..aba4a26 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -410,6 +410,7 @@ Status PlanFragmentExecutor::GetNext(RowBatch** batch) {
     row_batch_->Reset();
   }
   UpdateStatus(status);
+  COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows());
 
   if (done_) {
     VLOG_QUERY << "Finished executing fragment query_id=" << PrintId(query_id_)
@@ -421,7 +422,6 @@ Status PlanFragmentExecutor::GetNext(RowBatch** batch) {
   }
 
   *batch = row_batch_.get();
-  COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows());
   return status;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7fad3e5d/shell/impala_client.py
----------------------------------------------------------------------
diff --git a/shell/impala_client.py b/shell/impala_client.py
index bc20b09..0d1c835 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -115,6 +115,9 @@ class ImpalaClient(object):
 
     Returns the index of the next exec node in summary.exec_nodes that should 
be
     processed, used internally to this method only.
+
+    NOTE: This is duplicated in impala_beeswax.py, and changes made here 
should also be
+    made there.
     """
     attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]
 
@@ -142,7 +145,7 @@ class ImpalaClient(object):
     # is the max over all instances (which should all have received the same 
number of
     # rows). Otherwise, the cardinality is the sum over all instances which 
process
     # disjoint partitions.
-    if node.is_broadcast and is_fragment_root:
+    if node.is_broadcast:
       cardinality = max_stats.cardinality
     else:
       cardinality = agg_stats.cardinality

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7fad3e5d/tests/beeswax/impala_beeswax.py
----------------------------------------------------------------------
diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py
index 79a106f..e0f5d55 100644
--- a/tests/beeswax/impala_beeswax.py
+++ b/tests/beeswax/impala_beeswax.py
@@ -30,15 +30,15 @@ import shlex
 import getpass
 import re
 
-from impala._thrift_gen.beeswax import BeeswaxService
-from impala._thrift_gen.beeswax.BeeswaxService import QueryState
+from beeswaxd import BeeswaxService
+from beeswaxd.BeeswaxService import QueryState
 from datetime import datetime
 try:
   # If Exec Summary is not implemented in Impala, this cannot be imported
-  from impala._thrift_gen.ExecStats.ttypes import TExecStats
+  from ExecStats.ttypes import TExecStats
 except ImportError:
   pass
-from impala._thrift_gen.ImpalaService import ImpalaService
+from ImpalaService import ImpalaService
 from tests.util.thrift_util import create_transport
 from thrift.transport.TTransport import TTransportException
 from thrift.protocol import TBinaryProtocol
@@ -265,7 +265,7 @@ class ImpalaBeeswaxClient(object):
     # is the max over all instances (which should all have received the same 
number of
     # rows). Otherwise, the cardinality is the sum over all instances which 
process
     # disjoint partitions.
-    if node.is_broadcast and is_fragment_root:
+    if node.is_broadcast:
       cardinality = max_stats.cardinality
     else:
       cardinality = agg_stats.cardinality

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7fad3e5d/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py 
b/tests/query_test/test_observability.py
new file mode 100644
index 0000000..59e6a73
--- /dev/null
+++ b/tests/query_test/test_observability.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+class TestObservability(ImpalaTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  def test_merge_exchange_num_rows(self):
+    """Regression test for IMPALA-1473 - checks that the exec summary for a 
merging
+    exchange with a limit reports the number of rows returned as equal to the 
limit,
+    and that the coordinator fragment portion of the runtime profile reports 
the number
+    of rows returned correctly."""
+    query = """select tinyint_col, count(*) from functional.alltypes
+        group by tinyint_col order by tinyint_col limit 5"""
+    result = self.execute_query(query)
+    assert result.exec_summary[0]['operator'] == '05:MERGING-EXCHANGE'
+    assert result.exec_summary[0]['num_rows'] == 5
+    assert result.exec_summary[0]['est_num_rows'] == 5
+
+    for line in result.runtime_profile.split('\n'):
+      # The first 'RowsProduced' we find is for the coordinator fragment.
+      if 'RowsProduced' in line:
+        assert '(5)' in line
+        break
+
+  def test_broadcast_num_rows(self):
+    """Regression test for IMPALA-3002 - checks that the num_rows for a 
broadcast node
+    in the exec summaty is correctly set as the max over all instances, not 
the sum."""
+    query = """select distinct a.int_col, a.string_col from 
functional.alltypes a
+        inner join functional.alltypessmall b on (a.id = b.id)
+        where a.year = 2009 and b.month = 2"""
+    result = self.execute_query(query)
+    assert result.exec_summary[5]['operator'] == '04:EXCHANGE'
+    assert result.exec_summary[5]['num_rows'] == 25
+    assert result.exec_summary[5]['est_num_rows'] == 25

Reply via email to