IMPALA-7011: Simplify PlanRootSink control logic

1) The eos_ and sender_done_ bits really encode three possible states
   that the sender can be in. Make this explicit using an enum with
   three values.

2) The purpose of CloseConsumer() has changed over time and we can clean
   this up now:

 a) Originally, it looks like it was used to unblock the sender when the
   consumer finishes before eos, but also keep the sink alive long
   enough for the coordinator. This is no longer necessary now that
   control structures are owned by the QueryState whose lifetime is
   controlled by a reference count taken by the coordinator. So, we don't
   need the coordinator to tell the sink it's done calling it and we
   don't need the consumer_done_ state.

 b) Later on, CloseConsumer() was used as a cancellation mechinism.
   We need to keep this around (or use timeouts on the condvars) to kick
   both the consumer and producer on cancellation. But let's make the
   cancellation logic similar to the exec nodes and other sinks by
   driving the cancellation using the RuntimeState's cancellation
   flag. Now that CloseConsumer() is only about cancellation, rename it
   to Cancel() (later we may promote it to DataSink and implement in the
   data stream sender as well).

Testing:
- Exhaustive
- Minicluster concurrent_select.py stress

Change-Id: Ifc75617a253fd43a6122baa4b4dc7aeb1dbe633f
Reviewed-on: http://gerrit.cloudera.org:8080/10449
Reviewed-by: Dan Hecht <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3b8a9648
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3b8a9648
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3b8a9648

Branch: refs/heads/2.x
Commit: 3b8a96485e36336c8e823479559cd81edc2a231d
Parents: fd27b17
Author: Dan Hecht <[email protected]>
Authored: Thu May 17 17:03:54 2018 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri May 25 23:17:16 2018 +0000

----------------------------------------------------------------------
 be/src/exec/plan-root-sink.cc             | 39 +++++++-------
 be/src/exec/plan-root-sink.h              | 72 +++++++++++++-------------
 be/src/runtime/coordinator.cc             | 11 +---
 be/src/runtime/fragment-instance-state.cc |  6 +--
 4 files changed, 60 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3b8a9648/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 836a376..a64dbb9 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -72,11 +72,10 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* 
batch) {
   // written clients may not cope correctly with them. See IMPALA-4335.
   while (current_batch_row < batch->num_rows()) {
     unique_lock<mutex> l(lock_);
-    while (results_ == nullptr && !consumer_done_) sender_cv_.Wait(l);
-    if (consumer_done_ || batch == nullptr) {
-      eos_ = true;
-      return Status::OK();
-    }
+    // Wait until the consumer gives us a result set to fill in, or the 
fragment
+    // instance has been cancelled.
+    while (results_ == nullptr && !state->is_cancelled()) sender_cv_.Wait(l);
+    RETURN_IF_CANCELLED(state);
 
     // Otherwise the consumer is ready. Fill out the rows.
     DCHECK(results_ != nullptr);
@@ -107,29 +106,26 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* 
batch) {
 
 Status PlanRootSink::FlushFinal(RuntimeState* state) {
   unique_lock<mutex> l(lock_);
-  sender_done_ = true;
-  eos_ = true;
+  sender_state_ = SenderState::EOS;
   consumer_cv_.NotifyAll();
   return Status::OK();
 }
 
 void PlanRootSink::Close(RuntimeState* state) {
   unique_lock<mutex> l(lock_);
-  // No guarantee that FlushFinal() has been called, so need to mark 
sender_done_ here as
-  // well.
-  // TODO: shouldn't this also set eos to true? do we need both eos and 
sender_done_?
-  sender_done_ = true;
+  // FlushFinal() won't have been called when the fragment instance encounters 
an error
+  // before sending all rows.
+  if (sender_state_ == SenderState::ROWS_PENDING) {
+    sender_state_ = SenderState::CLOSED_NOT_EOS;
+  }
   consumer_cv_.NotifyAll();
-  // Wait for consumer to be done, in case sender tries to tear-down this sink 
while the
-  // sender is still reading from it.
-  while (!consumer_done_) sender_cv_.Wait(l);
   DataSink::Close(state);
 }
 
-void PlanRootSink::CloseConsumer() {
-  unique_lock<mutex> l(lock_);
-  consumer_done_ = true;
+void PlanRootSink::Cancel(RuntimeState* state) {
+  DCHECK(state->is_cancelled());
   sender_cv_.NotifyAll();
+  consumer_cv_.NotifyAll();
 }
 
 Status PlanRootSink::GetNext(
@@ -140,9 +136,14 @@ Status PlanRootSink::GetNext(
   num_rows_requested_ = num_results;
   sender_cv_.NotifyAll();
 
-  while (!eos_ && results_ != nullptr && !sender_done_) consumer_cv_.Wait(l);
+  // Wait while the sender is still producing rows and hasn't filled in the 
current
+  // result set.
+  while (sender_state_ == SenderState::ROWS_PENDING && results_ != nullptr &&
+      !state->is_cancelled()) {
+    consumer_cv_.Wait(l);
+  }
 
-  *eos = eos_;
+  *eos = sender_state_ == SenderState::EOS;
   return state->GetQueryStatus();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3b8a9648/be/src/exec/plan-root-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index 87ab3ef..1d64b21 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -36,19 +36,25 @@ class ScalarExprEvaluator;
 /// The consumer calls GetNext() with a QueryResultSet and a requested fetch
 /// size. GetNext() shares these fields with Send(), and then signals Send() 
to begin
 /// populating the result set. GetNext() returns when a) the sender has sent 
all of its
-/// rows b) the requested fetch size has been satisfied or c) the sender calls 
Close().
+/// rows b) the requested fetch size has been satisfied or c) the sender 
fragment
+/// instance was cancelled.
 ///
-/// Send() fills in as many rows as are requested from the current batch. When 
the batch
-/// is exhausted - which may take several calls to GetNext() - control is 
returned to the
-/// sender to produce another row batch.
+/// The sender uses Send() to fill in as many rows as are requested from the 
current
+/// batch. When the batch is exhausted - which may take several calls to 
GetNext() -
+/// Send() returns so that the fragment instance can produce another row batch.
 ///
-/// When the consumer is finished, CloseConsumer() must be called to allow the 
sender to
-/// exit Send(). Senders must call Close() to signal to the consumer that no 
more batches
-/// will be produced. CloseConsumer() may be called concurrently with 
GetNext(). Senders
-/// should ensure that the consumer is not blocked in GetNext() before 
destroying the
-/// PlanRootSink.
+/// FlushFinal() should be called by the sender to signal it has finished 
calling
+/// Send() for all rows. Close() should be called by the sender to release 
resources.
 ///
-/// The sink is thread safe up to a single producer and single consumer.
+/// When the fragment instance is cancelled, Cancel() is called to unblock 
both the
+/// sender and consumer. Cancel() may be called concurrently with Send(), 
GetNext() and
+/// Close().
+///
+/// The sink is thread safe up to a single sender and single consumer.
+///
+/// Lifetime: The sink is owned by the QueryState and has the same lifetime as
+/// QueryState. The QueryState references from the fragment instance and the 
Coordinator
+/// ensures that this outlives any calls to Send() and GetNext(), respectively.
 ///
 /// TODO: The consumer drives the sender in lock-step with GetNext() calls, 
forcing a
 /// context-switch on every invocation. Measure the impact of this, and 
consider moving to
@@ -62,25 +68,23 @@ class PlanRootSink : public DataSink {
   /// consumer has consumed 'batch' by calling GetNext().
   virtual Status Send(RuntimeState* state, RowBatch* batch);
 
-  /// Sets eos and notifies consumer.
+  /// Indicates eos and notifies consumer.
   virtual Status FlushFinal(RuntimeState* state);
 
-  /// To be called by sender only. Signals to the consumer that no more 
batches will be
-  /// produced, then blocks until someone calls CloseConsumer().
+  /// To be called by sender only. Release resources and unblocks consumer.
   virtual void Close(RuntimeState* state);
 
-  /// Populates 'result_set' with up to 'num_rows' rows produced by the 
fragment instance
-  /// that calls Send(). *eos is set to 'true' when there are no more rows to 
consume. If
-  /// CloseConsumer() is called concurrently, GetNext() will return and may 
not populate
-  /// 'result_set'. All subsequent calls after CloseConsumer() will do no work.
+  /// To be called by the consumer only. 'result_set' with up to 'num_rows' 
rows
+  /// produced by the fragment instance that calls Send(). *eos is set to 
'true' when
+  /// there are no more rows to consume. If Cancel() or Close() are called 
concurrently,
+  /// GetNext() will return and may not populate 'result_set'. All subsequent 
calls
+  /// after Cancel() or Close() are no-ops.
   Status GetNext(
       RuntimeState* state, QueryResultSet* result_set, int num_rows, bool* 
eos);
 
-  /// Signals to the producer that the sink will no longer be used. GetNext() 
may be
-  /// safely called after this returns (it does nothing), but consumers should 
consider
-  /// that the PlanRootSink may be undergoing destruction. May be called more 
than once;
-  /// only the first call has any effect.
-  void CloseConsumer();
+  /// Unblocks both the consumer and sender so they can check the cancellation 
flag in
+  /// the RuntimeState. The cancellation flag should be set prior to calling 
this.
+  void Cancel(RuntimeState* state);
 
   static const std::string NAME;
 
@@ -90,21 +94,22 @@ class PlanRootSink : public DataSink {
 
   /// Waited on by the sender only. Signalled when the consumer has written 
results_ and
   /// num_rows_requested_, and so the sender may begin satisfying that request 
for rows
-  /// from its current batch. Also signalled when CloseConsumer() is called, 
to unblock
-  /// the sender.
+  /// from its current batch. Also signalled when Cancel() is called, to 
unblock the
+  /// sender.
   ConditionVariable sender_cv_;
 
   /// Waited on by the consumer only. Signalled when the sender has finished 
serving a
-  /// request for rows. Also signalled by Close() and FlushFinal() to signal 
to the
-  /// consumer that no more rows are coming.
+  /// request for rows. Also signalled by FlushFinal(), Close() and Cancel() 
to unblock
+  /// the consumer.
   ConditionVariable consumer_cv_;
 
-  /// Signals to producer that the consumer is done, and the sink may be torn 
down.
-  bool consumer_done_ = false;
-
-  /// Signals to consumer that the sender is done, and that there are no more 
row batches
-  /// to consume.
-  bool sender_done_ = false;
+  /// State of the sender:
+  /// - ROWS_PENDING: the sender is still producing rows; the only 
non-terminal state
+  /// - EOS: the sender has passed all rows to Send()
+  /// - CLOSED_NOT_EOS: the sender (i.e. sink) was closed before all rows were 
passed to
+  ///   Send()
+  enum class SenderState { ROWS_PENDING, EOS, CLOSED_NOT_EOS };
+  SenderState sender_state_ = SenderState::ROWS_PENDING;
 
   /// The current result set passed to GetNext(), to fill in Send(). Not owned 
by this
   /// sink. Reset to nullptr after Send() completes the request to signal to 
the consumer
@@ -114,9 +119,6 @@ class PlanRootSink : public DataSink {
   /// Set by GetNext() to indicate to Send() how many rows it should write to 
results_.
   int num_rows_requested_ = 0;
 
-  /// Set to true in Send() and FlushFinal() when the Sink() has finished 
producing rows.
-  bool eos_ = false;
-
   /// Writes a single row into 'result' and 'scales' by evaluating
   /// output_expr_evals_ over 'row'.
   void GetRowValue(TupleRow* row, std::vector<void*>* result, 
std::vector<int>* scales);

http://git-wip-us.apache.org/repos/asf/impala/blob/3b8a9648/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 042605d..995d747 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -148,14 +148,8 @@ Status Coordinator::Exec() {
       DCHECK(!prepare_status.ok());
       return UpdateExecState(prepare_status, nullptr, FLAGS_hostname);
     }
-
-    // When GetFInstanceState() returns the coordinator instance, the Prepare 
phase
-    // is done and the FragmentInstanceState's root sink will be set up. At 
that point,
-    // the coordinator must be sure to call root_sink()->CloseConsumer(); the
-    // fragment instance's executor will not complete until that point.
-    // TODO: what does this mean?
-    // TODO: Consider moving this to Wait().
-    // TODO: clarify need for synchronization on this event
+    // When GetFInstanceState() returns the coordinator instance, the Prepare 
phase is
+    // done and the FragmentInstanceState's root sink will be set up.
     DCHECK(coord_instance_->IsPrepared() && 
coord_instance_->WaitForPrepare().ok());
     coord_sink_ = coord_instance_->root_sink();
     DCHECK(coord_sink_ != nullptr);
@@ -527,7 +521,6 @@ void Coordinator::HandleExecStateTransition(
       exec_rpcs_complete_barrier_->pending() <= 0) << "exec rpcs not 
completed";
 
   query_events_->MarkEvent(exec_state_to_event.at(new_state));
-  if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
   // This thread won the race to transitioning into a terminal state - 
terminate
   // execution and release resources.
   ReleaseExecResources();

http://git-wip-us.apache.org/repos/asf/impala/blob/3b8a9648/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc 
b/be/src/runtime/fragment-instance-state.cc
index a14bf31..c61cb81 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -103,13 +103,9 @@ void FragmentInstanceState::Cancel() {
   // being cancelled.
   discard_result(WaitForPrepare());
 
-  // Ensure that the sink is closed from both sides. Although in ordinary 
executions we
-  // rely on the consumer to do this, in error cases the consumer may not be 
able to send
-  // CloseConsumer() (see IMPALA-4348 for an example).
-  if (root_sink_ != nullptr) root_sink_->CloseConsumer();
-
   DCHECK(runtime_state_ != nullptr);
   runtime_state_->set_is_cancelled();
+  if (root_sink_ != nullptr) root_sink_->Cancel(runtime_state_);
   runtime_state_->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
 }
 

Reply via email to