IMPALA-4580: Fix crash with FETCH_FIRST when #rows < result cache size

The following sequence can lead to a crash:

  1. Client sets result cache size to N
  2. Client issues query with #results < N
  3. Client fetches all results, triggering eos and tearing down
     Coordinator::root_sink_.
  4. Client restarts query with FETCH_FIRST.
  5. Client reads all results again. After cache is exhausted,
     Coordinator::GetNext() is called to detect eos condition again.
  6. GetNext() hits DCHECK(root_sink_ != nullptr).

This patch makes GetNext() a no-op if called after it sets *eos,
avoiding the crash..

Testing:
  Regression test that triggered the bug before this fix.

Change-Id: I454cd8a6cf438bdd0c49fd27c2725d8f6c43bb1d
Reviewed-on: http://gerrit.cloudera.org:8080/5335
Reviewed-by: Henry Robinson <[email protected]>
Reviewed-by: Dan Hecht <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b9034ea0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b9034ea0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b9034ea0

Branch: refs/heads/master
Commit: b9034ea0d54ad40e11b482b577362ceee3768f1e
Parents: 694d72e
Author: Henry Robinson <[email protected]>
Authored: Wed Nov 30 22:04:49 2016 -0800
Committer: Internal Jenkins <[email protected]>
Committed: Sat Dec 3 11:07:04 2016 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc |  7 +++++++
 be/src/runtime/coordinator.h  |  4 ++--
 tests/hs2/test_fetch_first.py | 30 ++++++++++++++++++++++++++++++
 3 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b9034ea0/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 0a278ae..2b446df 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -1104,6 +1104,13 @@ Status Coordinator::GetNext(QueryResultSet* results, int 
max_rows, bool* eos) {
   DCHECK(has_called_wait_);
   SCOPED_TIMER(query_profile_->total_time_counter());
 
+  if (returned_all_results_) {
+    // May be called after the first time we set *eos. Re-set *eos and return 
here;
+    // already torn-down root_sink_ so no more work to do.
+    *eos = true;
+    return Status::OK();
+  }
+
   DCHECK(root_sink_ != nullptr)
       << "GetNext() called without result sink. Perhaps Prepare() failed and 
was not "
       << "checked?";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b9034ea0/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index b6f1aa8..d53f16c 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -121,8 +121,8 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   /// Fills 'results' with up to 'max_rows' rows. May return fewer than 
'max_rows'
   /// rows, but will not return more.
   ///
-  /// If *eos is true, execution has completed and GetNext() must not be called
-  /// again.
+  /// If *eos is true, execution has completed. Subsequent calls to GetNext() 
will be a
+  /// no-op.
   ///
   /// GetNext() will not set *eos=true until all fragment instances have 
either completed
   /// or have failed.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b9034ea0/tests/hs2/test_fetch_first.py
----------------------------------------------------------------------
diff --git a/tests/hs2/test_fetch_first.py b/tests/hs2/test_fetch_first.py
index ca9ebef..b2d5084 100644
--- a/tests/hs2/test_fetch_first.py
+++ b/tests/hs2/test_fetch_first.py
@@ -72,6 +72,36 @@ class TestFetchFirst(HS2TestSuite):
 
   @pytest.mark.execute_serially
   @needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6)
+  def test_fetch_first_with_exhausted_cache(self):
+    """Regression test for IMPALA-4580. If a result cache is large enough to 
include all
+    results, and the fetch is restarted after all rows have been fetched, the 
final fetch
+    (internally) that returns EOS is not idempotent and can crash."""
+    RESULT_SET_SIZE = 100
+    execute_statement_req = TCLIService.TExecuteStatementReq()
+    execute_statement_req.sessionHandle = self.session_handle
+    execute_statement_req.confOverlay = dict()
+    execute_statement_req.confOverlay[self.IMPALA_RESULT_CACHING_OPT] =\
+      str(RESULT_SET_SIZE)
+    execute_statement_req.statement =\
+      "SELECT * FROM functional.alltypes ORDER BY id LIMIT %s" % 
RESULT_SET_SIZE
+    execute_statement_resp = 
self.hs2_client.ExecuteStatement(execute_statement_req)
+    HS2TestSuite.check_response(execute_statement_resp)
+
+    # First fetch more than the entire result set, ensuring that coordinator 
has hit EOS
+    # condition.
+    self.fetch_until(execute_statement_resp.operationHandle,
+                     TCLIService.TFetchOrientation.FETCH_NEXT, RESULT_SET_SIZE 
+ 1,
+                     RESULT_SET_SIZE)
+
+    # Now restart the fetch, again trying to fetch more than the full result 
set size so
+    # that the cache is exhausted and the coordinator is checked for more rows.
+    self.fetch_until(execute_statement_resp.operationHandle,
+                     TCLIService.TFetchOrientation.FETCH_FIRST, 
RESULT_SET_SIZE + 1,
+                     RESULT_SET_SIZE)
+    self.close(execute_statement_resp.operationHandle)
+
+  @pytest.mark.execute_serially
+  @needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6)
   def test_query_stmts_v6(self):
     self.run_query_stmts_test();
 

Reply via email to