Repository: impala
Updated Branches:
  refs/heads/master a78a8d62a -> 5e2dcd25d


IMPALA-7829: Mark a fragment instance as done only after Close() is called

As shown in IMPALA-7828. there is some non-determinism on whether the errors
detected in FragmentInstanceState::Close() will show up in the final profile
sent to the coordinator. The reason is that the current code marks a fragment
instance as "done" after ExecInternal() completes but before Close() is called.
There is a window between when the final status report is sent and when Close()
finishes.

This change fixes the problem by not sending the final report until Close()
is called. This has no implication on the first row available time for normal
queries. It may slightly lengthen the first row available time for DML queries.

Testing done: Updated udf-no-expr-rewrite.test to exercise this test

Perf run on an 8 node clusters didn't show any regression:

TPCH-300
+------------+-----------------------+---------+------------+------------+----------------+
| Workload   | File Format           | Avg (s) | Delta(Avg) | GeoMean(s) | 
Delta(GeoMean) |
+------------+-----------------------+---------+------------+------------+----------------+
| TPCH(_300) | parquet / none / none | 23.94   | -2.05%     | 12.55      | 
-2.62%         |
+------------+-----------------------+---------+------------+------------+----------------+

Small concurrency
+-------------------------+-----------------------+---------+------------+------------+----------------+
| Workload                | File Format           | Avg (s) | Delta(Avg) | 
GeoMean(s) | Delta(GeoMean) |
+-------------------------+-----------------------+---------+------------+------------+----------------+
| TPCDS-UNMODIFIED(_1000) | parquet / none / none | 6.89    | -0.66%     | 6.62 
      | +0.41%         |
+-------------------------+-----------------------+---------+------------+------------+----------------+

Medium concurrency
+-------------------------+-----------------------+---------+------------+------------+----------------+
| Workload                | File Format           | Avg (s) | Delta(Avg) | 
GeoMean(s) | Delta(GeoMean) |
+-------------------------+-----------------------+---------+------------+------------+----------------+
| TPCDS-UNMODIFIED(_1000) | parquet / none / none | 55.57   | -1.04%     | 
55.27      | -0.98%         |
+-------------------------+-----------------------+---------+------------+------------+----------------+

Change-Id: I61618854ae3f4e7ef20028dcb0ff5cbcfa8adb01
Reviewed-on: http://gerrit.cloudera.org:8080/11939
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9ef9daca
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9ef9daca
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9ef9daca

Branch: refs/heads/master
Commit: 9ef9dacaf764aac869c8072eee823d1129657cf4
Parents: a78a8d6
Author: Michael Ho <[email protected]>
Authored: Thu Nov 8 18:23:43 2018 -0800
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Nov 20 23:49:11 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/fragment-instance-state.cc       | 20 ++++++++++----------
 be/src/runtime/fragment-instance-state.h        |  4 ----
 .../queries/QueryTest/udf-no-expr-rewrite.test  |  7 ++-----
 3 files changed, 12 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/9ef9daca/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc 
b/be/src/runtime/fragment-instance-state.cc
index b9f9a90..89d388f 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -94,6 +94,10 @@ Status FragmentInstanceState::Exec() {
   }
 
 done:
+  // Don't transition to completion until Close() is called as some new errors 
may be
+  // logged in RuntimeState:error_log_.
+  Close();
+
   // Must update the fragment instance state first before updating the 'Query 
State'.
   // Otherwise, there is a race when reading the 'done' flag with 
GetStatusReport().
   // This may lead to the "final" profile being sent with the 'done' flag as 
false.
@@ -113,9 +117,6 @@ done:
     // Tell the managing 'QueryState' that we're done with executing.
     query_state_->DoneExecuting();
   }
-  // call this before Close() to make sure the thread token got released
-  Finalize(status);
-  Close();
   return status;
 }
 
@@ -339,6 +340,12 @@ Status FragmentInstanceState::ExecInternal() {
 void FragmentInstanceState::Close() {
   DCHECK(runtime_state_ != nullptr);
 
+  // If we haven't already released this thread token in Prepare(), release
+  // it before calling Close().
+  if (fragment_ctx_.fragment.output_sink.type != 
TDataSinkType::PLAN_ROOT_SINK) {
+    ReleaseThreadToken();
+  }
+
   // guard against partially-finished Prepare()
   if (sink_ != nullptr) sink_->Close(runtime_state_);
 
@@ -440,13 +447,6 @@ void FragmentInstanceState::UpdateState(const StateEvent 
event)
   if (next_state != current_state) current_state_.Store(next_state);
 }
 
-void FragmentInstanceState::Finalize(const Status& status) {
-  if (fragment_ctx_.fragment.output_sink.type != 
TDataSinkType::PLAN_ROOT_SINK) {
-    // if we haven't already release this thread token in Prepare(), release 
it now
-    ReleaseThreadToken();
-  }
-}
-
 void FragmentInstanceState::ReleaseThreadToken() {
   DCHECK(runtime_state_ != nullptr);
   DCHECK(runtime_state_->resource_pool() != nullptr);

http://git-wip-us.apache.org/repos/asf/impala/blob/9ef9daca/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h 
b/be/src/runtime/fragment-instance-state.h
index 7636055..c7b8d79 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -268,10 +268,6 @@ class FragmentInstanceState {
   /// concurrently.
   void UpdateState(const StateEvent event);
 
-  /// Called when execution is complete to finalize counters. Must be called 
only once.
-  /// Can handle partially-finished Prepare().
-  void Finalize(const Status& status);
-
   /// Releases the thread token for this fragment executor. Can handle
   /// partially-finished Prepare().
   void ReleaseThreadToken();

http://git-wip-us.apache.org/repos/asf/impala/blob/9ef9daca/testdata/workloads/functional-query/queries/QueryTest/udf-no-expr-rewrite.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/udf-no-expr-rewrite.test
 
b/testdata/workloads/functional-query/queries/QueryTest/udf-no-expr-rewrite.test
index 364882e..d14723d 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/udf-no-expr-rewrite.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/udf-no-expr-rewrite.test
@@ -22,9 +22,6 @@ select mem_test_leaks(100);
 bigint
 ---- RESULTS
 100
-#---- ERRORS
-#TODO: Fix the non-determinisim below. IMPALA-7829.
-#A fragment instance's last status report may be sent before calling Close() 
which is
-#where the memory leak is detected. So, we don't always get the following 
error message.
-#UDF WARNING: Memory leaked via FunctionContext::Allocate(), 100 bytes leaked 
via FunctionContext::TrackAllocation()
+---- ERRORS
+UDF WARNING: Memory leaked via FunctionContext::Allocate(), 100 bytes leaked 
via FunctionContext::TrackAllocation()
 ====

Reply via email to