Repository: impala
Updated Branches:
  refs/heads/2.x 1e126dacc -> f38e09bba


IMPALA-6554: Fix a race in DequeueDeferredRpc()

Previously, KrpcDataStreamRecvr::DequeueDeferredRpc() will call
RespondAndReleaseRpc() outside the of sender queue's lock. Another
thread can race ahead and call KrpcDataStreamRecvr::Close() before
MemTracker::Release() is called on the receiver's MemTracker. This
may lead to update to the receiver's MemTracker after it has been
closed. This change fixes the problem by updating the MemTracker
before dropping the lock in DequeueDeferredRpc(). This change also
changes RespondAndReleaseRpc() to update the MemTracker first before
responding to the RPC. The order doesn't really matter much because
the response of an RPC is handled asynchronously by reactor threads so
there is always a window between when the MemTracker is updated and when
the actual RPC payload is freed. The order is updated so it's consistent
with that of DequeueDeferredRpc().

Change-Id: I4a3b0789633c7b8bc898381d509e2af769f0e069
Reviewed-on: http://gerrit.cloudera.org:8080/9446
Reviewed-by: Dan Hecht <dhe...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/14da6bb7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/14da6bb7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/14da6bb7

Branch: refs/heads/2.x
Commit: 14da6bb753683b11fc0ad2a9f64fec101dcfa86f
Parents: 1e126da
Author: Michael Ho <k...@cloudera.com>
Authored: Sat Feb 24 01:23:34 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Fri Mar 2 04:10:16 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/impala-service-pool.cc        |  3 +-
 be/src/runtime/krpc-data-stream-mgr.cc   | 20 ++++--------
 be/src/runtime/krpc-data-stream-mgr.h    |  6 ----
 be/src/runtime/krpc-data-stream-recvr.cc | 44 +++++++++++++--------------
 be/src/runtime/krpc-data-stream-recvr.h  | 12 ++++++--
 be/src/service/data-stream-service.cc    | 16 ++++++++++
 be/src/service/data-stream-service.h     |  6 ++++
 7 files changed, 60 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/14da6bb7/be/src/rpc/impala-service-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.cc 
b/be/src/rpc/impala-service-pool.cc
index fb50656..02c2fe0 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -130,9 +130,8 @@ void 
ImpalaServicePool::RejectTooBusy(kudu::rpc::InboundCall* c) {
 void ImpalaServicePool::FailAndReleaseRpc(
     const kudu::rpc::ErrorStatusPB::RpcErrorCodePB& error_code,
     const kudu::Status& status, kudu::rpc::InboundCall* call) {
-  int64_t transfer_size = call->GetTransferSize();
+  service_mem_tracker_->Release(call->GetTransferSize());
   call->RespondFailure(error_code, status);
-  service_mem_tracker_->Release(transfer_size);
 }
 
 kudu::rpc::RpcMethodInfo* ImpalaServicePool::LookupMethod(

http://git-wip-us.apache.org/repos/asf/impala/blob/14da6bb7/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc 
b/be/src/runtime/krpc-data-stream-mgr.cc
index 02609c8..40dd285 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -140,7 +140,7 @@ shared_ptr<DataStreamRecvrBase> 
KrpcDataStreamMgr::CreateRecvr(
   for (const unique_ptr<EndDataStreamCtx>& ctx :
       early_senders_for_recvr.closed_sender_ctxs) {
     recvr->RemoveSender(ctx->request->sender_id());
-    RespondAndReleaseRpc(Status::OK(), ctx->response, ctx->rpc_context,
+    DataStreamService::RespondAndReleaseRpc(Status::OK(), ctx->response, 
ctx->rpc_context,
         mem_tracker_.get());
     num_senders_waiting_->Increment(-1);
   }
@@ -228,7 +228,7 @@ void KrpcDataStreamMgr::AddData(const 
TransmitDataRequestPB* request,
     // detect this case by checking already_unregistered - if true then the 
receiver was
     // already closed deliberately, and there's no unexpected error here.
     ErrorMsg msg(TErrorCode::DATASTREAM_RECVR_CLOSED, PrintId(finst_id), 
dest_node_id);
-    RespondAndReleaseRpc(Status::Expected(msg), response, rpc_context,
+    DataStreamService::RespondAndReleaseRpc(Status::Expected(msg), response, 
rpc_context,
         service_mem_tracker_);
     return;
   }
@@ -284,7 +284,8 @@ void KrpcDataStreamMgr::CloseSender(const 
EndDataStreamRequestPB* request,
   // If we reach this point, either the receiver is found or it has been 
unregistered
   // already. In either cases, it's safe to just return an OK status.
   if (LIKELY(recvr != nullptr)) recvr->RemoveSender(request->sender_id());
-  RespondAndReleaseRpc(Status::OK(), response, rpc_context, 
service_mem_tracker_);
+  DataStreamService::RespondAndReleaseRpc(Status::OK(), response, rpc_context,
+      service_mem_tracker_);
 }
 
 Status KrpcDataStreamMgr::DeregisterRecvr(
@@ -348,21 +349,12 @@ void KrpcDataStreamMgr::RespondToTimedOutSender(const 
std::unique_ptr<ContextTyp
   ErrorMsg msg(TErrorCode::DATASTREAM_SENDER_TIMEOUT, remote_addr, 
PrintId(finst_id),
       ctx->request->dest_node_id());
   VLOG_QUERY << msg.msg();
-  RespondAndReleaseRpc(Status::Expected(msg), ctx->response, ctx->rpc_context,
-      mem_tracker_.get());
+  DataStreamService::RespondAndReleaseRpc(Status::Expected(msg), ctx->response,
+      ctx->rpc_context, mem_tracker_.get());
   num_senders_waiting_->Increment(-1);
   num_senders_timedout_->Increment(1);
 }
 
-template<typename ResponsePBType>
-void KrpcDataStreamMgr::RespondAndReleaseRpc(const Status& status,
-    ResponsePBType* response, kudu::rpc::RpcContext* ctx, MemTracker* 
mem_tracker) {
-  status.ToProto(response->mutable_status());
-  int64_t transfer_size = ctx->GetTransferSize();
-  ctx->RespondSuccess();
-  mem_tracker->Release(transfer_size);
-}
-
 void KrpcDataStreamMgr::Maintenance() {
   const int32_t sleep_time_ms =
       min(max(1, FLAGS_datastream_sender_timeout_ms / 2), 10000);

http://git-wip-us.apache.org/repos/asf/impala/blob/14da6bb7/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h 
b/be/src/runtime/krpc-data-stream-mgr.h
index d5f9d0e..889aee5 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -480,12 +480,6 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
   template<typename ContextType, typename RequestPBType>
   void RespondToTimedOutSender(const std::unique_ptr<ContextType>& ctx);
 
-  /// Respond to the RPC passed in 'response'/'ctx' with 'status' and release 
the payload
-  /// memory from 'mem_tracker'. Takes ownership of 'ctx'.
-  template<typename ResponsePBType>
-  void RespondAndReleaseRpc(const Status& status, ResponsePBType* response,
-      kudu::rpc::RpcContext* ctx, MemTracker* mem_tracker);
-
   /// Notifies any sender that has been waiting for its receiver for more than
   /// FLAGS_datastream_sender_timeout_ms.
   ///

http://git-wip-us.apache.org/repos/asf/impala/blob/14da6bb7/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc 
b/be/src/runtime/krpc-data-stream-recvr.cc
index 4f85996..60aacce 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -30,6 +30,7 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/sorted-run-merger.h"
+#include "service/data-stream-service.h"
 #include "util/runtime-profile-counters.h"
 #include "util/periodic-counter-updater.h"
 
@@ -127,10 +128,6 @@ class KrpcDataStreamRecvr::SenderQueue {
       const kudu::Slice& tuple_offsets, const kudu::Slice& tuple_data,
       unique_lock<SpinLock>* lock) WARN_UNUSED_RESULT;
 
-  // Respond to the TransmitData RPC passed in 'ctx' with 'status' and release 
the payload
-  // memory from the MemTracker associated with 'recvr_'.
-  void RespondAndReleaseRpc(const Status& status, const 
unique_ptr<TransmitDataCtx>& ctx);
-
   // Receiver of which this queue is a member.
   KrpcDataStreamRecvr* recvr_;
 
@@ -297,11 +294,13 @@ Status 
KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
     const kudu::Slice& tuple_data, unique_lock<SpinLock>* lock) {
   DCHECK(lock != nullptr);
   DCHECK(lock->owns_lock());
+  DCHECK(!is_cancelled_);
 
   COUNTER_ADD(recvr_->num_received_batches_, 1);
   COUNTER_ADD(recvr_->bytes_received_counter_, batch_size);
   // Reserve queue space before dropping the lock below.
   recvr_->num_buffered_bytes_.Add(batch_size);
+  // Bump 'num_pending_enqueue_' to avoid race with Close() when lock is 
dropped below.
   DCHECK_GE(num_pending_enqueue_, 0);
   ++num_pending_enqueue_;
 
@@ -316,7 +315,7 @@ Status 
KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
     // handle deleting any unconsumed batches from batch_queue_. Close() 
cannot proceed
     // until there are no pending insertion to batch_queue_.
     status = RowBatch::FromProtobuf(recvr_->row_desc(), header, tuple_offsets, 
tuple_data,
-        recvr_->mem_tracker(), recvr_->client(), &batch);
+        recvr_->parent_tracker(), recvr_->client(), &batch);
   }
   lock->lock();
 
@@ -334,14 +333,6 @@ Status 
KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
   return Status::OK();
 }
 
-void KrpcDataStreamRecvr::SenderQueue::RespondAndReleaseRpc(const Status& 
status,
-    const unique_ptr<TransmitDataCtx>& ctx) {
-  int64_t transfer_size = ctx->rpc_context->GetTransferSize();
-  status.ToProto(ctx->response->mutable_status());
-  ctx->rpc_context->RespondSuccess();
-  recvr_->mem_tracker()->Release(transfer_size);
-}
-
 void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* 
request,
     TransmitDataResponsePB* response, RpcContext* rpc_context) {
   // TODO: Add timers for time spent in this function and queue time in 
'batch_queue_'.
@@ -378,7 +369,7 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const 
TransmitDataRequestPB* req
     // batch needs to line up after the deferred RPCs to avoid starvation of 
senders
     // in the non-merging case.
     if (UNLIKELY(!deferred_rpcs_.empty() || !CanEnqueue(batch_size))) {
-      recvr_->mem_tracker()->Consume(rpc_context->GetTransferSize());
+      recvr_->deferred_rpc_tracker()->Consume(rpc_context->GetTransferSize());
       auto payload = make_unique<TransmitDataCtx>(request, response, 
rpc_context);
       deferred_rpcs_.push(move(payload));
       COUNTER_ADD(recvr_->num_deferred_batches_, 1);
@@ -416,7 +407,8 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() 
{
         &tuple_data, &batch_size);
     // Reply with error status if the entry cannot be unpacked.
     if (UNLIKELY(!status.ok())) {
-      RespondAndReleaseRpc(status, ctx);
+      DataStreamService::RespondAndReleaseRpc(status, ctx->response, 
ctx->rpc_context,
+          recvr_->deferred_rpc_tracker());
       deferred_rpcs_.pop();
       return;
     }
@@ -434,21 +426,27 @@ void 
KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
     const RowBatchHeaderPB& header = ctx->request->row_batch_header();
     status = AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
     DCHECK(!status.ok() || !batch_queue_.empty());
+
+    // Release to MemTracker while still holding the lock to prevent race with 
Close().
+    
recvr_->deferred_rpc_tracker()->Release(ctx->rpc_context->GetTransferSize());
   }
 
   // Responds to the sender to ack the insertion of the row batches.
-  RespondAndReleaseRpc(status, ctx);
+  // No need to hold lock when enqueuing the response.
+  status.ToProto(ctx->response->mutable_status());
+  ctx->rpc_context->RespondSuccess();
 }
 
 void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
     unique_ptr<TransmitDataCtx> ctx) {
   int sender_id = ctx->request->sender_id();
-  recvr_->mem_tracker()->Consume(ctx->rpc_context->GetTransferSize());
+  recvr_->deferred_rpc_tracker()->Consume(ctx->rpc_context->GetTransferSize());
   COUNTER_ADD(recvr_->num_arrived_batches_, 1);
   {
     lock_guard<SpinLock> l(lock_);
     if (UNLIKELY(is_cancelled_)) {
-      RespondAndReleaseRpc(Status::OK(), ctx);
+      DataStreamService::RespondAndReleaseRpc(Status::OK(), ctx->response,
+          ctx->rpc_context, recvr_->deferred_rpc_tracker());
       return;
     }
     deferred_rpcs_.push(move(ctx));
@@ -478,8 +476,9 @@ void KrpcDataStreamRecvr::SenderQueue::Cancel() {
 
     // Respond to deferred RPCs.
     while (!deferred_rpcs_.empty()) {
-      const unique_ptr<TransmitDataCtx>& payload = deferred_rpcs_.front();
-      RespondAndReleaseRpc(Status::OK(), payload);
+      const unique_ptr<TransmitDataCtx>& ctx = deferred_rpcs_.front();
+      DataStreamService::RespondAndReleaseRpc(Status::OK(), ctx->response,
+          ctx->rpc_context, recvr_->deferred_rpc_tracker());
       deferred_rpcs_.pop();
     }
   }
@@ -549,11 +548,12 @@ 
KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
     row_desc_(row_desc),
     is_merging_(is_merging),
     num_buffered_bytes_(0),
+    deferred_rpc_tracker_(new MemTracker(-1, "KrpcDeferredRpcs", 
parent_tracker)),
+    parent_tracker_(parent_tracker),
     client_(client),
     profile_(profile),
     recvr_side_profile_(profile_->CreateChild("RecvrSide")),
     sender_side_profile_(profile_->CreateChild("SenderSide")) {
-  mem_tracker_.reset(new MemTracker(-1, "KrpcDataStreamRecvr", 
parent_tracker));
   // Create one queue per sender if is_merging is true.
   int num_queues = is_merging ? num_senders : 1;
   sender_queues_.reserve(num_queues);
@@ -636,7 +636,7 @@ void KrpcDataStreamRecvr::Close() {
   mgr_ = nullptr;
   for (auto& queue: sender_queues_) queue->Close();
   merger_.reset();
-  mem_tracker_->Close();
+  deferred_rpc_tracker_->Close();
   recvr_side_profile_->StopPeriodicCounters();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/14da6bb7/be/src/runtime/krpc-data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.h 
b/be/src/runtime/krpc-data-stream-recvr.h
index 758079b..c07e0ec 100644
--- a/be/src/runtime/krpc-data-stream-recvr.h
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -102,7 +102,8 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   const TUniqueId& fragment_instance_id() const { return 
fragment_instance_id_; }
   PlanNodeId dest_node_id() const { return dest_node_id_; }
   const RowDescriptor* row_desc() const { return row_desc_; }
-  MemTracker* mem_tracker() const { return mem_tracker_.get(); }
+  MemTracker* deferred_rpc_tracker() const { return 
deferred_rpc_tracker_.get(); }
+  MemTracker* parent_tracker() const { return parent_tracker_; }
   BufferPool::ClientHandle* client() const { return client_; }
 
  private:
@@ -169,8 +170,13 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   /// total number of bytes held across all sender queues.
   AtomicInt32 num_buffered_bytes_;
 
-  /// Memtracker for batches in the sender queue(s).
-  boost::scoped_ptr<MemTracker> mem_tracker_;
+  /// Memtracker for payloads of deferred Rpcs in the sender queue(s).
+  /// This must be accessed with 'lock_' held to avoid race with Close().
+  boost::scoped_ptr<MemTracker> deferred_rpc_tracker_;
+
+  /// The MemTracker of the exchange node which owns this receiver. Not owned.
+  /// This is the MemTracker which 'client_' below internally references.
+  MemTracker* parent_tracker_;
 
   /// The buffer pool client for allocating buffers of incoming row batches. 
Not owned.
   BufferPool::ClientHandle* client_;

http://git-wip-us.apache.org/repos/asf/impala/blob/14da6bb7/be/src/service/data-stream-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.cc 
b/be/src/service/data-stream-service.cc
index 34682d4..b04105b 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -79,4 +79,20 @@ void DataStreamService::TransmitData(const 
TransmitDataRequestPB* request,
   ExecEnv::GetInstance()->KrpcStreamMgr()->AddData(request, response, 
rpc_context);
 }
 
+template<typename ResponsePBType>
+void DataStreamService::RespondAndReleaseRpc(const Status& status,
+    ResponsePBType* response, kudu::rpc::RpcContext* ctx, MemTracker* 
mem_tracker) {
+  mem_tracker->Release(ctx->GetTransferSize());
+  status.ToProto(response->mutable_status());
+  ctx->RespondSuccess();
+}
+
+template void DataStreamService::RespondAndReleaseRpc(const Status& status,
+    TransmitDataResponsePB* response, kudu::rpc::RpcContext* ctx,
+    MemTracker* mem_tracker);
+
+template void DataStreamService::RespondAndReleaseRpc(const Status& status,
+    EndDataStreamResponsePB* response, kudu::rpc::RpcContext* ctx,
+    MemTracker* mem_tracker);
+
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/14da6bb7/be/src/service/data-stream-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.h 
b/be/src/service/data-stream-service.h
index 63a0bf7..cba27ae 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -56,6 +56,12 @@ class DataStreamService : public DataStreamServiceIf {
   virtual void TransmitData(const TransmitDataRequestPB* request,
       TransmitDataResponsePB* response, kudu::rpc::RpcContext* context);
 
+  /// Respond to a RPC passed in 'response'/'ctx' with 'status' and release
+  /// the payload memory from 'mem_tracker'. Takes ownership of 'ctx'.
+  template<typename ResponsePBType>
+  static void RespondAndReleaseRpc(const Status& status, ResponsePBType* 
response,
+      kudu::rpc::RpcContext* ctx, MemTracker* mem_tracker);
+
   MemTracker* mem_tracker() { return mem_tracker_.get(); }
 
  private:

Reply via email to