IMPALA-6116: Bound memory usage of DataStreamSevice's service queue

The fix for IMPALA-6193 added a memory tracker for the memory consumed
by the payloads in the service queue of DataStreamService. This change
extends it by introducing a bound on the memory usage for that service
queue. In addition, it deprecates FLAGS_datastream_service_queue_depth
and replaces it with FLAGS_datastream_service_queue_mem_limit. These flags
only take effect when KRPC is in use and KRPC was never enabled in any
previous releases so it seems safe to do this flag replacement. The new
flag FLAGS_datastream_service_queue_mem_limit directly dictates the amount
of memory which can be consumed by the service queue of DataStreamService.
This allows a more direct control over the memory usage of the queue instead
of inferring via the number of entries in the queue. The default value of
this flag is left at 0, in which case it will be set to 5% of process
memory limit.

Testing done: exhaustive debug builds. Updated data-stream-test to
exercise the case in which the payload is larger than the limit.

Change-Id: Idea4262dfb0e0aa8d58ff6ea6a8aaaa248e880b9
Reviewed-on: http://gerrit.cloudera.org:8080/9282
Reviewed-by: Michael Ho <k...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b961de23
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b961de23
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b961de23

Branch: refs/heads/2.x
Commit: b961de23da039b599fb94dddcad1f304ed29eb00
Parents: 9a38902
Author: Michael Ho <k...@cloudera.com>
Authored: Sat Feb 3 00:05:09 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Sat Feb 17 23:10:15 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/impala-service-pool.cc           |  48 ++++++----
 be/src/rpc/impala-service-pool.h            |  62 ++++++++-----
 be/src/rpc/rpc-mgr-test-base.h              |  48 ++++++----
 be/src/rpc/rpc-mgr-test.cc                  |  15 ++--
 be/src/rpc/rpc-mgr.cc                       |   7 +-
 be/src/rpc/rpc-mgr.h                        |   5 +-
 be/src/runtime/data-stream-test.cc          | 110 +++++++++++++++--------
 be/src/runtime/exec-env.cc                  |  31 ++-----
 be/src/runtime/exec-env.h                   |   3 +
 be/src/runtime/krpc-data-stream-mgr.cc      |  37 ++++----
 be/src/runtime/krpc-data-stream-mgr.h       |  25 +++---
 be/src/runtime/mem-tracker.h                |   1 +
 be/src/service/data-stream-service.cc       |  33 ++++++-
 be/src/service/data-stream-service.h        |  15 +++-
 tests/custom_cluster/test_krpc_mem_usage.py |   4 +-
 15 files changed, 282 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/rpc/impala-service-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.cc 
b/be/src/rpc/impala-service-pool.cc
index 34a3960..35a5d6d 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -47,15 +47,14 @@ METRIC_DEFINE_histogram(server, impala_unused,
 
 namespace impala {
 
-ImpalaServicePool::ImpalaServicePool(MemTracker* mem_tracker,
-                         std::unique_ptr<kudu::rpc::ServiceIf> service,
-                         const scoped_refptr<kudu::MetricEntity>& entity,
-                         size_t service_queue_length)
-  : mem_tracker_(mem_tracker),
-    service_(std::move(service)),
+ImpalaServicePool::ImpalaServicePool(const scoped_refptr<kudu::MetricEntity>& 
entity,
+    size_t service_queue_length, kudu::rpc::ServiceIf* service,
+    MemTracker* service_mem_tracker)
+  : service_mem_tracker_(service_mem_tracker),
+    service_(service),
     service_queue_(service_queue_length),
     unused_histogram_(METRIC_impala_unused.Instantiate(entity)) {
-  DCHECK(mem_tracker_ != nullptr);
+  DCHECK(service_mem_tracker_ != nullptr);
 }
 
 ImpalaServicePool::~ImpalaServicePool() {
@@ -114,7 +113,7 @@ void ImpalaServicePool::FailAndReleaseRpc(
     const kudu::Status& status, kudu::rpc::InboundCall* call) {
   int64_t transfer_size = call->GetTransferSize();
   call->RespondFailure(error_code, status);
-  mem_tracker_->Release(transfer_size);
+  service_mem_tracker_->Release(transfer_size);
 }
 
 kudu::rpc::RpcMethodInfo* ImpalaServicePool::LookupMethod(
@@ -143,20 +142,39 @@ kudu::Status ImpalaServicePool::QueueInboundCall(
 
   TRACE_TO(c->trace(), "Inserting onto call queue"); // NOLINT(*)
 
-  // Queue message on service queue
-  mem_tracker_->Consume(c->GetTransferSize());
+  // Queue message on service queue.
+  const int64_t transfer_size = c->GetTransferSize();
+  {
+    // Drops an incoming request if consumption already exceeded the limit. 
Note that
+    // the current inbound call isn't counted towards the limit yet so adding 
this call
+    // may cause the MemTracker's limit to be exceeded. This is done to ensure 
fairness
+    // among all inbound calls, otherwise calls with larger payloads are more 
likely to
+    // fail. The check and the consumption need to be atomic so as to bound 
the memory
+    // usage.
+    unique_lock<SpinLock> mem_tracker_lock(mem_tracker_lock_);
+    if (UNLIKELY(service_mem_tracker_->AnyLimitExceeded())) {
+      // Discards the transfer early so the transfer size drops to 0. This is 
to ensure
+      // the MemTracker::Release() call in FailAndReleaseRpc() is correct as 
we haven't
+      // called MemTracker::Consume() at this point.
+      mem_tracker_lock.unlock();
+      c->DiscardTransfer();
+      RejectTooBusy(c);
+      return kudu::Status::OK();
+    }
+    service_mem_tracker_->Consume(transfer_size);
+  }
+
   boost::optional<kudu::rpc::InboundCall*> evicted;
   auto queue_status = service_queue_.Put(c, &evicted);
-  if (queue_status == kudu::rpc::QueueStatus::QUEUE_FULL) {
+  if (UNLIKELY(queue_status == kudu::rpc::QueueStatus::QUEUE_FULL)) {
     RejectTooBusy(c);
     return kudu::Status::OK();
   }
-
-  if (PREDICT_FALSE(evicted != boost::none)) {
+  if (UNLIKELY(evicted != boost::none)) {
     RejectTooBusy(*evicted);
   }
 
-  if (PREDICT_TRUE(queue_status == kudu::rpc::QueueStatus::QUEUE_SUCCESS)) {
+  if (LIKELY(queue_status == kudu::rpc::QueueStatus::QUEUE_SUCCESS)) {
     // NB: do not do anything with 'c' after it is successfully queued --
     // a service thread may have already dequeued it, processed it, and
     // responded by this point, in which case the pointer would be invalid.
@@ -187,7 +205,7 @@ void ImpalaServicePool::RunThread() {
     incoming->RecordHandlingStarted(unused_histogram_);
     ADOPT_TRACE(incoming->trace());
 
-    if (PREDICT_FALSE(incoming->ClientTimedOut())) {
+    if (UNLIKELY(incoming->ClientTimedOut())) {
       TRACE_TO(incoming->trace(), "Skipping call since client already timed 
out"); // NOLINT(*)
       rpcs_timed_out_in_queue_.Add(1);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/rpc/impala-service-pool.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.h b/be/src/rpc/impala-service-pool.h
index fe70686..624e937 100644
--- a/be/src/rpc/impala-service-pool.h
+++ b/be/src/rpc/impala-service-pool.h
@@ -28,25 +28,33 @@
 #include "kudu/rpc/service_queue.h"
 #include "kudu/util/status.h"
 #include "util/histogram-metric.h"
+#include "util/spinlock.h"
 #include "util/thread.h"
 
 namespace impala {
 class MemTracker;
 
-// A pool of threads that handle new incoming RPC calls.
-// Also includes a queue that calls get pushed onto for handling by the pool.
+/// A pool of threads that handle new incoming RPC calls.
+/// Also includes a queue that calls get pushed onto for handling by the pool.
 class ImpalaServicePool : public kudu::rpc::RpcService {
  public:
-  ImpalaServicePool(MemTracker* mem_tracker,
-      std::unique_ptr<kudu::rpc::ServiceIf> service,
-      const scoped_refptr<kudu::MetricEntity>& metric_entity,
-      size_t service_queue_length);
+  /// 'service_queue_length' is the maximum number of requests that may be 
queued for
+  /// this service before clients begin to see rejection errors.
+  ///
+  /// 'service' contains an interface implementation that will handle RPCs.
+  ///
+  /// 'service_mem_tracker' is the MemTracker for tracking the memory usage of 
RPC
+  /// payloads in the service queue.
+  ImpalaServicePool(const scoped_refptr<kudu::MetricEntity>& entity,
+      size_t service_queue_length, kudu::rpc::ServiceIf* service,
+      MemTracker* service_mem_tracker);
+
   virtual ~ImpalaServicePool();
 
-  // Start up the thread pool.
+  /// Start up the thread pool.
   virtual Status Init(int num_threads);
 
-  // Shut down the queue and the thread pool.
+  /// Shut down the queue and the thread pool.
   virtual void Shutdown();
 
   kudu::rpc::RpcMethodInfo* LookupMethod(const kudu::rpc::RemoteMethod& 
method) override;
@@ -60,31 +68,41 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   void RunThread();
   void RejectTooBusy(kudu::rpc::InboundCall* c);
 
-  // Respond with failure to the incoming call in 'call' with 'error_code' and 
'status'
-  // and release the payload memory from 'mem_tracker_'. Takes ownership of 
'call'.
+  /// Respond with failure to the incoming call in 'call' with 'error_code' 
and 'status'
+  /// and release the payload memory from 'mem_tracker_'. Takes ownership of 
'call'.
   void FailAndReleaseRpc(const kudu::rpc::ErrorStatusPB::RpcErrorCodePB& 
error_code,
       const kudu::Status& status, kudu::rpc::InboundCall* call);
 
-  // Tracks memory of inbound calls in 'service_queue_'.
-  MemTracker* const mem_tracker_;
+  /// Synchronizes accesses to 'service_mem_tracker_' to avoid over 
consumption.
+  SpinLock mem_tracker_lock_;
+
+  /// Tracks memory of inbound calls in 'service_queue_'.
+  MemTracker* const service_mem_tracker_;
+
+  /// Reference to the implementation of the RPC handlers. Not owned.
+  kudu::rpc::ServiceIf* const service_;
 
-  std::unique_ptr<kudu::rpc::ServiceIf> service_;
-  std::vector<std::unique_ptr<Thread> > threads_;
+  /// The set of service threads started to process incoming RPC calls.
+  std::vector<std::unique_ptr<Thread>> threads_;
+
+  /// The pending RPCs to be dequeued by the service threads.
   kudu::rpc::LifoServiceQueue service_queue_;
 
-  // TODO: Display these metrics in the debug webpage. IMPALA-6269
-  // Number of RPCs that timed out while waiting in the service queue.
+  /// TODO: Display these metrics in the debug webpage. IMPALA-6269
+  /// Number of RPCs that timed out while waiting in the service queue.
   AtomicInt32 rpcs_timed_out_in_queue_;
-  // Number of RPCs that were rejected due to the queue being full.
+
+  /// Number of RPCs that were rejected due to the queue being full.
   AtomicInt32 rpcs_queue_overflow_;
 
-  // Dummy histogram needed to call InboundCall::RecordHandlingStarted() to set
-  // appropriate internal KRPC state. Unused otherwise.
-  // TODO: Consider displaying this histogram in the debug webpage. IMPALA-6269
+  /// Dummy histogram needed to call InboundCall::RecordHandlingStarted() to 
set
+  /// appropriate internal KRPC state. Unused otherwise.
+  /// TODO: Consider displaying this histogram in the debug webpage. 
IMPALA-6269
   scoped_refptr<kudu::Histogram> unused_histogram_;
 
-  // Protects against concurrent Shutdown() operations.
-  // TODO: This seems implausible given our current usage pattern. Consider 
removing lock.
+  /// Protects against concurrent Shutdown() operations.
+  /// TODO: This seems implausible given our current usage pattern.
+  /// Consider removing lock.
   boost::mutex shutdown_lock_;
   bool closing_ = false;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/rpc/rpc-mgr-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test-base.h b/be/src/rpc/rpc-mgr-test-base.h
index 43b6d83..4a79040 100644
--- a/be/src/rpc/rpc-mgr-test-base.h
+++ b/be/src/rpc/rpc-mgr-test-base.h
@@ -130,7 +130,12 @@ template <class T> class RpcMgrTestBase : public T {
     request->set_sidecar_idx(idx);
   }
 
-  MemTracker* service_tracker() { return &service_tracker_; }
+  // Takes over ownership of the newly created 'service' which needs to have a 
lifetime
+  // as long as 'rpc_mgr_' as RpcMgr::Shutdown() will call Shutdown() of 
'service'.
+  ServiceIf* TakeOverService(std::unique_ptr<ServiceIf> service) {
+    services_.emplace_back(move(service));
+    return services_.back().get();
+  }
 
  protected:
   TNetworkAddress krpc_address_;
@@ -149,7 +154,9 @@ template <class T> class RpcMgrTestBase : public T {
 
  private:
   int32_t payload_[PAYLOAD_SIZE];
-  MemTracker service_tracker_;
+
+  // Own all the services used by the test.
+  std::vector<std::unique_ptr<ServiceIf>> services_;
 };
 
 typedef std::function<void(RpcContext*)> ServiceCB;
@@ -158,28 +165,30 @@ class PingServiceImpl : public PingServiceIf {
  public:
   // 'cb' is a callback used by tests to inject custom behaviour into the RPC 
handler.
   PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* 
mem_tracker,
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker,
       ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
-    : PingServiceIf(entity, tracker), mem_tracker_(mem_tracker), cb_(cb) {}
+    : PingServiceIf(entity, tracker), mem_tracker_(-1, "Ping Service"), 
cb_(cb) {}
 
   virtual void Ping(
       const PingRequestPB* request, PingResponsePB* response, RpcContext* 
context) {
     response->set_int_response(42);
     // Incoming requests will already be tracked and we need to release the 
memory.
-    mem_tracker_->Release(context->GetTransferSize());
+    mem_tracker_.Release(context->GetTransferSize());
     cb_(context);
   }
 
+  MemTracker* mem_tracker() { return &mem_tracker_; }
+
  private:
-  MemTracker* mem_tracker_;
+  MemTracker mem_tracker_;
   ServiceCB cb_;
 };
 
 class ScanMemServiceImpl : public ScanMemServiceIf {
  public:
   ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* 
mem_tracker)
-    : ScanMemServiceIf(entity, tracker), mem_tracker_(mem_tracker) {
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker)
+    : ScanMemServiceIf(entity, tracker), mem_tracker_(-1, "ScanMem Service") {
   }
 
   // The request comes with an int 'pattern' and a payload of int array sent 
with
@@ -197,36 +206,39 @@ class ScanMemServiceImpl : public ScanMemServiceIf {
       int32_t val = v[i];
       if (val != pattern) {
         // Incoming requests will already be tracked and we need to release 
the memory.
-        mem_tracker_->Release(context->GetTransferSize());
+        mem_tracker_.Release(context->GetTransferSize());
         context->RespondFailure(kudu::Status::Corruption(
             Substitute("Expecting $1; Found $2", pattern, val)));
         return;
       }
     }
     // Incoming requests will already be tracked and we need to release the 
memory.
-    mem_tracker_->Release(context->GetTransferSize());
+    mem_tracker_.Release(context->GetTransferSize());
     context->RespondSuccess();
   }
 
+  MemTracker* mem_tracker() { return &mem_tracker_; }
+
  private:
-  MemTracker* mem_tracker_;
+  MemTracker mem_tracker_;
 
 };
 
 template <class T>
 Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
     RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
-  MemTracker* mem_tracker = test_base->service_tracker();
   // Test that a service can be started, and will respond to requests.
-  unique_ptr<ServiceIf> ping_impl(new PingServiceImpl(rpc_mgr->metric_entity(),
-      rpc_mgr->result_tracker(), mem_tracker));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(ping_impl), 
mem_tracker));
+  ServiceIf* ping_impl = 
test_base->TakeOverService(make_unique<PingServiceImpl>(
+      rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, ping_impl,
+      static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
 
   // Test that a second service, that verifies the RPC payload is not 
corrupted,
   // can be started.
-  unique_ptr<ServiceIf> scan_mem_impl(new 
ScanMemServiceImpl(rpc_mgr->metric_entity(),
-      rpc_mgr->result_tracker(), mem_tracker));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(scan_mem_impl), 
mem_tracker));
+  ServiceIf* scan_mem_impl = 
test_base->TakeOverService(make_unique<ScanMemServiceImpl>(
+      rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, scan_mem_impl,
+      static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
 
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index cd24672..8d5312f 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -178,12 +178,12 @@ TEST_F(RpcMgrTest, SlowCallback) {
   // Test a service which is slow to respond and has a short queue.
   // Set a timeout on the client side. Expect either a client timeout
   // or the service queue filling up.
-  unique_ptr<ServiceIf> impl(new PingServiceImpl(rpc_mgr_.metric_entity(),
-      rpc_mgr_.result_tracker(), service_tracker(), slow_cb));
+  ServiceIf* ping_impl = TakeOverService(make_unique<PingServiceImpl>(
+      rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker(), slow_cb));
   const int num_service_threads = 1;
   const int queue_size = 3;
-  ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, 
move(impl),
-      service_tracker()));
+  ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, 
ping_impl,
+      static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
 
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;
@@ -204,9 +204,10 @@ TEST_F(RpcMgrTest, SlowCallback) {
 }
 
 TEST_F(RpcMgrTest, AsyncCall) {
-  unique_ptr<ServiceIf> scan_mem_impl(new 
ScanMemServiceImpl(rpc_mgr_.metric_entity(),
-      rpc_mgr_.result_tracker(), service_tracker()));
-  ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(scan_mem_impl), 
service_tracker()));
+  ServiceIf* scan_mem_impl = TakeOverService(make_unique<ScanMemServiceImpl>(
+      rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
+  ASSERT_OK(rpc_mgr_.RegisterService(10, 10, scan_mem_impl,
+      static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
 
   unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
   ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, 
&scan_mem_proxy));

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index 44ecc02..d723280 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -120,12 +120,11 @@ Status RpcMgr::Init() {
 }
 
 Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t 
service_queue_depth,
-    unique_ptr<ServiceIf> service_ptr, MemTracker* mem_tracker) {
+    ServiceIf* service_ptr, MemTracker* service_mem_tracker) {
   DCHECK(is_inited()) << "Must call Init() before RegisterService()";
   DCHECK(!services_started_) << "Cannot call RegisterService() after 
StartServices()";
-  scoped_refptr<ImpalaServicePool> service_pool =
-      new ImpalaServicePool(mem_tracker, std::move(service_ptr),
-          messenger_->metric_entity(), service_queue_depth);
+  scoped_refptr<ImpalaServicePool> service_pool = new ImpalaServicePool(
+      messenger_->metric_entity(), service_queue_depth, service_ptr, 
service_mem_tracker);
   // Start the thread pool first before registering the service in case the 
startup fails.
   RETURN_IF_ERROR(service_pool->Init(num_service_threads));
   KUDU_RETURN_IF_ERROR(

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index fc74c2e..e87b559 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -122,9 +122,12 @@ class RpcMgr {
   /// the service name has to be unique within an Impala instance or the 
registration will
   /// fail.
   ///
+  /// 'service_mem_tracker' is the MemTracker for tracking the memory usage of 
RPC
+  /// payloads in the service queue.
+  ///
   /// It is an error to call this after StartServices() has been called.
   Status RegisterService(int32_t num_service_threads, int32_t 
service_queue_depth,
-      std::unique_ptr<kudu::rpc::ServiceIf> service_ptr, MemTracker* 
mem_tracker)
+      kudu::rpc::ServiceIf* service_ptr, MemTracker* service_mem_tracker)
       WARN_UNUSED_RESULT;
 
   /// Creates a new proxy for a remote service of type P at location 
'address', and places

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc 
b/be/src/runtime/data-stream-test.cc
index 75d5ac9..c540d1d 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -51,6 +51,7 @@
 #include "util/thread.h"
 #include "util/time.h"
 #include "util/mem-info.h"
+#include "util/parse-util.h"
 #include "util/test-info.h"
 #include "util/tuple-row-compare.h"
 #include "gen-cpp/data_stream_service.pb.h"
@@ -61,6 +62,7 @@
 #include "service/fe-support.h"
 
 #include <iostream>
+#include <string>
 #include <unistd.h>
 
 #include "common/names.h"
@@ -78,6 +80,7 @@ DEFINE_int32(port, 20001, "port on which to run Impala Thrift 
based test backend
 DECLARE_int32(datastream_sender_timeout_ms);
 DECLARE_int32(datastream_service_num_deserialization_threads);
 DECLARE_int32(datastream_service_deserialization_queue_size);
+DECLARE_string(datastream_service_queue_mem_limit);
 
 DECLARE_bool(use_krpc);
 
@@ -90,7 +93,7 @@ static const int BATCH_CAPACITY = 100;  // rows
 static const int PER_ROW_DATA = 8;
 static const int TOTAL_DATA_SIZE = 8 * 1024;
 static const int NUM_BATCHES = TOTAL_DATA_SIZE / BATCH_CAPACITY / PER_ROW_DATA;
-
+static const int SHORT_SERVICE_QUEUE_MEM_LIMIT = 16;
 
 namespace impala {
 
@@ -133,9 +136,22 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
  public:
   ImpalaKRPCTestBackend(RpcMgr* rpc_mgr, KrpcDataStreamMgr* stream_mgr)
     : DataStreamServiceIf(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()),
-      stream_mgr_(stream_mgr) {}
+      rpc_mgr_(rpc_mgr),
+      stream_mgr_(stream_mgr) {
+    MemTracker* process_mem_tracker = 
ExecEnv::GetInstance()->process_mem_tracker();
+    bool is_percent;
+    int64_t bytes_limit = 
ParseUtil::ParseMemSpec(FLAGS_datastream_service_queue_mem_limit,
+        &is_percent, process_mem_tracker->limit());
+    mem_tracker_.reset(
+        new MemTracker(bytes_limit, "DataStream Test", process_mem_tracker));
+  }
+
   virtual ~ImpalaKRPCTestBackend() {}
 
+  Status Init() {
+    return rpc_mgr_->RegisterService(CpuInfo::num_cores(), 1024, this, 
mem_tracker());
+  }
+
   virtual void TransmitData(const TransmitDataRequestPB* request,
       TransmitDataResponsePB* response, RpcContext* rpc_context) {
     stream_mgr_->AddData(request, response, rpc_context);
@@ -146,8 +162,12 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
     stream_mgr_->CloseSender(request, response, rpc_context);
   }
 
+  MemTracker* mem_tracker() { return mem_tracker_.get(); }
+
  private:
+  RpcMgr* rpc_mgr_;
   KrpcDataStreamMgr* stream_mgr_;
+  unique_ptr<MemTracker> mem_tracker_;
 };
 
 template <class T> class DataStreamTestBase : public T {
@@ -161,7 +181,7 @@ enum KrpcSwitch {
   USE_KRPC
 };
 
-class DataStreamTest : public 
DataStreamTestBase<testing::TestWithParam<KrpcSwitch> > {
+class DataStreamTest : public 
DataStreamTestBase<testing::TestWithParam<KrpcSwitch>> {
  protected:
   DataStreamTest() : next_val_(0) {
 
@@ -188,7 +208,6 @@ class DataStreamTest : public 
DataStreamTestBase<testing::TestWithParam<KrpcSwit
     next_instance_id_.lo = 0;
     next_instance_id_.hi = 0;
     stream_mgr_ = ExecEnv::GetInstance()->stream_mgr();
-    if (GetParam() == USE_KRPC) krpc_mgr_ = ExecEnv::GetInstance()->rpc_mgr();
 
     broadcast_sink_.dest_node_id = DEST_NODE_ID;
     broadcast_sink_.output_partition.type = TPartitionType::UNPARTITIONED;
@@ -219,6 +238,9 @@ class DataStreamTest : public 
DataStreamTestBase<testing::TestWithParam<KrpcSwit
     if (GetParam() == USE_THRIFT) {
       StartThriftBackend();
     } else {
+      IpAddr ip;
+      ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+      krpc_address_ = MakeNetworkAddress(ip, FLAGS_port);
       StartKrpcBackend();
     }
   }
@@ -281,12 +303,14 @@ class DataStreamTest : public 
DataStreamTestBase<testing::TestWithParam<KrpcSwit
   int64_t* tuple_mem_;
 
   // Only used for KRPC. Not owned.
-  RpcMgr* krpc_mgr_ = nullptr;
   TNetworkAddress krpc_address_;
 
+  // The test service implementation. Owned by this class.
+  unique_ptr<ImpalaKRPCTestBackend> test_service_;
+
   // receiving node
   DataStreamMgrBase* stream_mgr_ = nullptr;
-  ThriftServer* server_;
+  ThriftServer* server_ = nullptr;
 
   // sending node(s)
   TDataStreamSink broadcast_sink_;
@@ -387,8 +411,8 @@ class DataStreamTest : public 
DataStreamTestBase<testing::TestWithParam<KrpcSwit
   RowBatch* CreateRowBatch() {
     RowBatch* batch = new RowBatch(row_desc_, BATCH_CAPACITY, &tracker_);
     int64_t* tuple_mem = reinterpret_cast<int64_t*>(
-        batch->tuple_data_pool()->Allocate(BATCH_CAPACITY * 8));
-    bzero(tuple_mem, BATCH_CAPACITY * 8);
+        batch->tuple_data_pool()->Allocate(BATCH_CAPACITY * PER_ROW_DATA));
+    bzero(tuple_mem, BATCH_CAPACITY * PER_ROW_DATA);
     for (int i = 0; i < BATCH_CAPACITY; ++i) {
       int idx = batch->AddRow();
       TupleRow* row = batch->GetRow(idx);
@@ -529,7 +553,7 @@ class DataStreamTest : public 
DataStreamTestBase<testing::TestWithParam<KrpcSwit
     // Dynamic cast stream_mgr_ which is of type DataStreamMgrBase to derived 
type
     // DataStreamMgr, since ImpalaThriftTestBackend() accepts only 
DataStreamMgr*.
     boost::shared_ptr<ImpalaThriftTestBackend> handler(
-        new 
ImpalaThriftTestBackend(dynamic_cast<DataStreamMgr*>(stream_mgr_)));
+        new 
ImpalaThriftTestBackend(ExecEnv::GetInstance()->ThriftStreamMgr()));
     boost::shared_ptr<TProcessor> processor(new 
ImpalaInternalServiceProcessor(handler));
     ThriftServerBuilder builder("DataStreamTest backend", processor, 
FLAGS_port);
     ASSERT_OK(builder.Build(&server_));
@@ -537,26 +561,13 @@ class DataStreamTest : public 
DataStreamTestBase<testing::TestWithParam<KrpcSwit
   }
 
   void StartKrpcBackend() {
-    IpAddr ip;
-    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
-    krpc_address_ = MakeNetworkAddress(ip, FLAGS_port);
-
-    MemTracker* data_svc_tracker = obj_pool_.Add(
-        new MemTracker(-1, "Data Stream Service",
-            ExecEnv::GetInstance()->process_mem_tracker()));
-    MemTracker* stream_mgr_tracker = obj_pool_.Add(
-        new MemTracker(-1, "Data Stream Queued RPC Calls",
-            ExecEnv::GetInstance()->process_mem_tracker()));
-
-    KrpcDataStreamMgr* stream_mgr_ref = 
dynamic_cast<KrpcDataStreamMgr*>(stream_mgr_);
-    ASSERT_OK(stream_mgr_ref->Init(stream_mgr_tracker, data_svc_tracker));
-    ASSERT_OK(krpc_mgr_->Init());
-
-    unique_ptr<ServiceIf> handler(
-        new ImpalaKRPCTestBackend(krpc_mgr_, stream_mgr_ref));
-    ASSERT_OK(krpc_mgr_->RegisterService(CpuInfo::num_cores(), 1024, 
move(handler),
-        data_svc_tracker));
-    ASSERT_OK(krpc_mgr_->StartServices(krpc_address_));
+    RpcMgr* rpc_mgr = ExecEnv::GetInstance()->rpc_mgr();
+    KrpcDataStreamMgr* krpc_stream_mgr = 
ExecEnv::GetInstance()->KrpcStreamMgr();
+    ASSERT_OK(rpc_mgr->Init());
+    test_service_.reset(new ImpalaKRPCTestBackend(rpc_mgr, krpc_stream_mgr));
+    ASSERT_OK(test_service_->Init());
+    ASSERT_OK(krpc_stream_mgr->Init(test_service_->mem_tracker()));
+    ASSERT_OK(rpc_mgr->StartServices(krpc_address_));
   }
 
   void StopThriftBackend() {
@@ -566,7 +577,7 @@ class DataStreamTest : public 
DataStreamTestBase<testing::TestWithParam<KrpcSwit
   }
 
   void StopKrpcBackend() {
-    krpc_mgr_->Shutdown();
+    ExecEnv::GetInstance()->rpc_mgr()->Shutdown();
   }
 
   void StartSender(TPartitionType::type partition_type = 
TPartitionType::UNPARTITIONED,
@@ -660,7 +671,7 @@ class DataStreamTest : public 
DataStreamTestBase<testing::TestWithParam<KrpcSwit
   }
 };
 
-// We use a seperate class for tests that are required to be run against 
Thrift only.
+// A seperate class for tests that are required to be run against Thrift only.
 class DataStreamTestThriftOnly : public DataStreamTest {
  protected:
   virtual void SetUp() {
@@ -672,10 +683,9 @@ class DataStreamTestThriftOnly : public DataStreamTest {
   }
 };
 
-// We need a seperate test class for IMPALA-6346, since we need to do some 
pre-SetUp()
-// work. Specifically we need to set 2 flags that will be picked up during the 
SetUp()
-// phase of the DataStreamTest class.
-class DataStreamTestForImpala6346 : public DataStreamTest {
+// A seperate test class which simulates the behavior in which deserialization 
queue
+// fills up and all deserialization threads are busy.
+class DataStreamTestShortDeserQueue : public DataStreamTest {
  protected:
   virtual void SetUp() {
     FLAGS_datastream_service_num_deserialization_threads = 1;
@@ -688,13 +698,31 @@ class DataStreamTestForImpala6346 : public DataStreamTest 
{
   }
 };
 
+// A separate test class which simulates that the service queue fills up.
+class DataStreamTestShortServiceQueue : public DataStreamTest {
+ protected:
+  virtual void SetUp() {
+    // Set the memory limit to very low to make the soft limit easy to surpass.
+    FLAGS_datastream_service_queue_mem_limit =
+        std::to_string(SHORT_SERVICE_QUEUE_MEM_LIMIT);
+    DataStreamTest::SetUp();
+  }
+
+  virtual void TearDown() {
+    DataStreamTest::TearDown();
+  }
+};
+
 INSTANTIATE_TEST_CASE_P(ThriftOrKrpc, DataStreamTest,
     ::testing::Values(USE_THRIFT, USE_KRPC));
 
 INSTANTIATE_TEST_CASE_P(ThriftOnly, DataStreamTestThriftOnly,
     ::testing::Values(USE_THRIFT));
 
-INSTANTIATE_TEST_CASE_P(KrpcOnly, DataStreamTestForImpala6346,
+INSTANTIATE_TEST_CASE_P(KrpcOnly, DataStreamTestShortDeserQueue,
+    ::testing::Values(USE_KRPC));
+
+INSTANTIATE_TEST_CASE_P(KrpcOnly, DataStreamTestShortServiceQueue,
     ::testing::Values(USE_KRPC));
 
 TEST_P(DataStreamTest, UnknownSenderSmallResult) {
@@ -813,7 +841,7 @@ TEST_P(DataStreamTestThriftOnly, 
CloseRecvrWhileReferencesRemain) {
 // already being deserialized will be waiting on the KrpcDataStreamMgr::lock_ 
as well.
 // But the first thread will never release the lock since it's stuck on 
Offer(), causing
 // a deadlock. This is fixed with IMPALA-6346.
-TEST_P(DataStreamTestForImpala6346, TestNoDeadlock) {
+TEST_P(DataStreamTestShortDeserQueue, TestNoDeadlock) {
   TUniqueId instance_id;
   GetNextInstanceId(&instance_id);
 
@@ -834,7 +862,7 @@ TEST_P(DataStreamTestForImpala6346, TestNoDeadlock) {
   info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, 
DEST_NODE_ID,
       4, 1024 * 1024, false, profile, &tracker_);
   info.thread_handle = new thread(
-      &DataStreamTestForImpala6346_TestNoDeadlock_Test::ReadStream, this, 
&info);
+      &DataStreamTestShortDeserQueue_TestNoDeadlock_Test::ReadStream, this, 
&info);
 
   JoinSenders();
   CheckSenders();
@@ -844,6 +872,12 @@ TEST_P(DataStreamTestForImpala6346, TestNoDeadlock) {
   CheckReceivers(TPartitionType::UNPARTITIONED, 4);
 }
 
+// Test that payloads larger than the service queue's soft mem limit can be 
transmitted.
+TEST_P(DataStreamTestShortServiceQueue, TestLargePayload) {
+  TestStream(
+      TPartitionType::UNPARTITIONED, 4, 1, SHORT_SERVICE_QUEUE_MEM_LIMIT * 2, 
false);
+}
+
 // TODO: more tests:
 // - test case for transmission error in last batch
 // - receivers getting created concurrently

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 1c3ab7a..17b7bec 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -85,11 +85,6 @@ DEFINE_bool_hidden(use_krpc, false, "Used to indicate 
whether to use KRPC for th
     "DataStream subsystem, or the Thrift RPC layer instead. Defaults to false. 
"
     "KRPC not yet supported");
 
-DEFINE_int32(datastream_service_queue_depth, 1024, "Size of datastream service 
queue");
-DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of datastream 
service "
-    "processing threads. If left at default value 0, it will be set to number 
of CPU "
-    "cores.");
-
 DECLARE_int32(state_store_port);
 DECLARE_int32(num_threads_per_core);
 DECLARE_int32(num_cores);
@@ -179,7 +174,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, 
int krpc_port,
 
   if (FLAGS_use_krpc) {
     VLOG_QUERY << "Using KRPC.";
-    // KRPC relies on resolved IP address. It's set in StartServices().
+    // KRPC relies on resolved IP address. It's set in Init().
     krpc_address_.__set_port(krpc_port);
     rpc_mgr_.reset(new RpcMgr(IsInternalTlsConfigured()));
     stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
@@ -318,27 +313,15 @@ Status ExecEnv::Init() {
   obj_pool_->Add(new MemTracker(negated_unused_reservation, -1,
       "Buffer Pool: Unused Reservation", mem_tracker_.get()));
 
-  // Initialize the RPCMgr before allowing services registration.
+  // Initializes the RPCMgr and DataStreamServices.
   if (FLAGS_use_krpc) {
     krpc_address_.__set_hostname(ip_address_);
+    // Initialization needs to happen in the following order due to 
dependencies:
+    // - RPC manager, DataStreamService and DataStreamManager.
     RETURN_IF_ERROR(rpc_mgr_->Init());
-
-    // Add a MemTracker for memory used to store incoming calls before they 
handed over to
-    // the data stream manager.
-    MemTracker* data_svc_tracker = obj_pool_->Add(
-        new MemTracker(-1, "Data Stream Service", mem_tracker_.get()));
-
-    // Add a MemTracker for the data stream manager, which uses it to track 
memory used by
-    // deferred RPC calls while they are buffered in the data stream manager.
-    MemTracker* stream_mgr_tracker = obj_pool_->Add(
-        new MemTracker(-1, "Data Stream Queued RPC Calls", 
mem_tracker_.get()));
-    RETURN_IF_ERROR(KrpcStreamMgr()->Init(stream_mgr_tracker, 
data_svc_tracker));
-
-    unique_ptr<ServiceIf> data_svc(new DataStreamService(rpc_mgr_.get()));
-    int num_svc_threads = FLAGS_datastream_service_num_svc_threads > 0 ?
-        FLAGS_datastream_service_num_svc_threads : CpuInfo::num_cores();
-    RETURN_IF_ERROR(rpc_mgr_->RegisterService(num_svc_threads,
-        FLAGS_datastream_service_queue_depth, move(data_svc), 
data_svc_tracker));
+    data_svc_.reset(new DataStreamService());
+    RETURN_IF_ERROR(data_svc_->Init());
+    RETURN_IF_ERROR(KrpcStreamMgr()->Init(data_svc_->mem_tracker()));
     // Bump thread cache to 1GB to reduce contention for TCMalloc central
     // list's spinlock.
     if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 193fdde..cd07f9b 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -43,6 +43,7 @@ class BufferPool;
 class CallableThreadPool;
 class DataStreamMgrBase;
 class DataStreamMgr;
+class DataStreamService;
 class QueryExecMgr;
 class Frontend;
 class HBaseTableFactory;
@@ -133,6 +134,7 @@ class ExecEnv {
   CallableThreadPool* rpc_pool() { return async_rpc_pool_.get(); }
   QueryExecMgr* query_exec_mgr() { return query_exec_mgr_.get(); }
   RpcMgr* rpc_mgr() const { return rpc_mgr_.get(); }
+  DataStreamService* data_svc() const { return data_svc_.get(); }
   PoolMemTrackerRegistry* pool_mem_trackers() { return 
pool_mem_trackers_.get(); }
   ReservationTracker* buffer_reservation() { return buffer_reservation_.get(); 
}
   BufferPool* buffer_pool() { return buffer_pool_.get(); }
@@ -198,6 +200,7 @@ class ExecEnv {
   boost::scoped_ptr<CallableThreadPool> async_rpc_pool_;
   boost::scoped_ptr<QueryExecMgr> query_exec_mgr_;
   boost::scoped_ptr<RpcMgr> rpc_mgr_;
+  boost::scoped_ptr<DataStreamService> data_svc_;
 
   /// Query-wide buffer pool and the root reservation tracker for the pool. The
   /// reservation limit is equal to the maximum capacity of the pool. Created 
in

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc 
b/be/src/runtime/krpc-data-stream-mgr.cc
index 91111dc..4a9a91e 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -25,11 +25,13 @@
 #include "kudu/rpc/rpc_context.h"
 
 #include "exec/kudu-util.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/krpc-data-stream-recvr.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "service/data-stream-service.h"
 #include "util/debug-util.h"
 #include "util/periodic-counter-updater.h"
 #include "util/runtime-profile-counters.h"
@@ -75,10 +77,12 @@ KrpcDataStreamMgr::KrpcDataStreamMgr(MetricGroup* metrics)
       "total-senders-timedout-waiting-for-recvr-creation", 0L);
 }
 
-Status KrpcDataStreamMgr::Init(MemTracker* mem_tracker,
-    MemTracker* incoming_request_tracker) {
-  mem_tracker_ = mem_tracker;
-  incoming_request_tracker_ = incoming_request_tracker;
+Status KrpcDataStreamMgr::Init(MemTracker* service_mem_tracker) {
+  // MemTracker for tracking memory used for buffering deferred RPC calls which
+  // arrive before the receiver is ready.
+  mem_tracker_.reset(new MemTracker(-1, "Data Stream Manager Deferred RPCs",
+      ExecEnv::GetInstance()->process_mem_tracker()));
+  service_mem_tracker_ = service_mem_tracker;
   RETURN_IF_ERROR(Thread::Create("krpc-data-stream-mgr", "maintenance",
       [this](){ this->Maintenance(); }, &maintenance_thread_));
   RETURN_IF_ERROR(deserialize_pool_.Init());
@@ -134,7 +138,8 @@ shared_ptr<DataStreamRecvrBase> 
KrpcDataStreamMgr::CreateRecvr(
   for (const unique_ptr<EndDataStreamCtx>& ctx :
       early_senders_for_recvr.closed_sender_ctxs) {
     recvr->RemoveSender(ctx->request->sender_id());
-    RespondAndReleaseRpc(Status::OK(), ctx->response, ctx->rpc_context, 
mem_tracker_);
+    RespondAndReleaseRpc(Status::OK(), ctx->response, ctx->rpc_context,
+        mem_tracker_.get());
     num_senders_waiting_->Increment(-1);
   }
   return recvr;
@@ -166,10 +171,9 @@ shared_ptr<KrpcDataStreamRecvr> 
KrpcDataStreamMgr::FindRecvr(
 void KrpcDataStreamMgr::AddEarlySender(const TUniqueId& finst_id,
     const TransmitDataRequestPB* request, TransmitDataResponsePB* response,
     kudu::rpc::RpcContext* rpc_context) {
-  DCHECK_EQ(incoming_request_tracker_->parent(), mem_tracker_->parent());
-  incoming_request_tracker_->ReleaseLocal(
-      rpc_context->GetTransferSize(), mem_tracker_->parent());
-  mem_tracker_->ConsumeLocal(rpc_context->GetTransferSize(), 
mem_tracker_->parent());
+  const int64_t transfer_size = rpc_context->GetTransferSize();
+  mem_tracker_->Consume(transfer_size);
+  service_mem_tracker_->Release(transfer_size);
   RecvrId recvr_id = make_pair(finst_id, request->dest_node_id());
   auto payload = make_unique<TransmitDataCtx>(request, response, rpc_context);
   early_senders_map_[recvr_id].waiting_sender_ctxs.emplace_back(move(payload));
@@ -180,10 +184,9 @@ void KrpcDataStreamMgr::AddEarlySender(const TUniqueId& 
finst_id,
 void KrpcDataStreamMgr::AddEarlyClosedSender(const TUniqueId& finst_id,
     const EndDataStreamRequestPB* request, EndDataStreamResponsePB* response,
     kudu::rpc::RpcContext* rpc_context) {
-  DCHECK_EQ(incoming_request_tracker_->parent(), mem_tracker_->parent());
-  incoming_request_tracker_->ReleaseLocal(
-      rpc_context->GetTransferSize(), mem_tracker_->parent());
-  mem_tracker_->ConsumeLocal(rpc_context->GetTransferSize(), 
mem_tracker_->parent());
+  const int64_t transfer_size = rpc_context->GetTransferSize();
+  mem_tracker_->Consume(transfer_size);
+  service_mem_tracker_->Release(transfer_size);
   RecvrId recvr_id = make_pair(finst_id, request->dest_node_id());
   auto payload = make_unique<EndDataStreamCtx>(request, response, rpc_context);
   early_senders_map_[recvr_id].closed_sender_ctxs.emplace_back(move(payload));
@@ -224,14 +227,14 @@ void KrpcDataStreamMgr::AddData(const 
TransmitDataRequestPB* request,
     // already closed deliberately, and there's no unexpected error here.
     ErrorMsg msg(TErrorCode::DATASTREAM_RECVR_CLOSED, PrintId(finst_id), 
dest_node_id);
     RespondAndReleaseRpc(Status::Expected(msg), response, rpc_context,
-        incoming_request_tracker_);
+        service_mem_tracker_);
     return;
   }
   DCHECK(recvr != nullptr);
   int64_t transfer_size = rpc_context->GetTransferSize();
   recvr->AddBatch(request, response, rpc_context);
   // Release memory. The receiver already tracks it in its instance tracker.
-  incoming_request_tracker_->Release(transfer_size);
+  service_mem_tracker_->Release(transfer_size);
 }
 
 void KrpcDataStreamMgr::EnqueueDeserializeTask(const TUniqueId& finst_id,
@@ -279,7 +282,7 @@ void KrpcDataStreamMgr::CloseSender(const 
EndDataStreamRequestPB* request,
   // If we reach this point, either the receiver is found or it has been 
unregistered
   // already. In either cases, it's safe to just return an OK status.
   if (LIKELY(recvr != nullptr)) recvr->RemoveSender(request->sender_id());
-  RespondAndReleaseRpc(Status::OK(), response, rpc_context, 
incoming_request_tracker_);
+  RespondAndReleaseRpc(Status::OK(), response, rpc_context, 
service_mem_tracker_);
 
   {
     // TODO: Move this to maintenance thread.
@@ -365,7 +368,7 @@ void KrpcDataStreamMgr::RespondToTimedOutSender(const 
std::unique_ptr<ContextTyp
       ctx->request->dest_node_id());
   VLOG_QUERY << msg.msg();
   RespondAndReleaseRpc(Status::Expected(msg), ctx->response, ctx->rpc_context,
-      mem_tracker_);
+      mem_tracker_.get());
   num_senders_waiting_->Increment(-1);
   num_senders_timedout_->Increment(1);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h 
b/be/src/runtime/krpc-data-stream-mgr.h
index 16c0b30..f4358ea 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -229,9 +229,11 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
  public:
   KrpcDataStreamMgr(MetricGroup* metrics);
 
-  /// Initialize the deserialization thread pool and create the maintenance 
thread.
+  /// Initializes the deserialization thread pool and creates the maintenance 
thread.
+  /// 'service_mem_tracker' is the DataStreamService's MemTracker for tracking 
memory
+  /// used for RPC payloads before being handed over to data stream manager / 
receiver.
   /// Return error status on failure. Return OK otherwise.
-  Status Init(MemTracker* mem_tracker, MemTracker* incoming_request_tracker);
+  Status Init(MemTracker* service_mem_tracker);
 
   /// Create a receiver for a specific fragment_instance_id/dest_node_id.
   /// If is_merging is true, the receiver maintains a separate queue of 
incoming row
@@ -290,18 +292,19 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
 
  private:
   friend class KrpcDataStreamRecvr;
+  friend class DataStreamTest;
 
   /// MemTracker for memory used for transmit data requests before we hand 
them over to a
   /// specific receiver. Used only to track payloads of deferred RPCs (e.g. 
early
-  /// senders). Not owned.
-  MemTracker* mem_tracker_ = nullptr;
-
-  /// MemTracker which is used by the DataStreamService to track memory for 
incoming
-  /// requests. Memory for new incoming requests is initially tracked against 
this tracker
-  /// before the requests are handed over to the data stream manager. It is 
this class's
-  /// responsibility to release memory from this tracker and track it against 
its own
-  /// tracker (here: mem_tracker_). Not owned.
-  MemTracker* incoming_request_tracker_ = nullptr;
+  /// senders).
+  std::unique_ptr<MemTracker> mem_tracker_;
+
+  /// MemTracker used by the DataStreamService to track memory for incoming 
requests.
+  /// Memory for new incoming requests is initially tracked against this 
tracker before
+  /// the requests are handed over to the data stream manager / receiver. It 
is the
+  /// responsibility of data stream manager or receiver to release memory from 
the
+  /// service's tracker and track it in their own trackers. Not owned.
+  MemTracker* service_mem_tracker_ = nullptr;
 
   /// A task for the deserialization threads to work on. The fields identify
   /// the target receiver's sender queue.

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 4228288..10a3424 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -367,6 +367,7 @@ class MemTracker {
  private:
   friend class PoolMemTrackerRegistry;
 
+  /// Returns true if the current memory tracker's limit is exceeded.
   bool CheckLimitExceeded() const { return limit_ >= 0 && limit_ < 
consumption(); }
 
   /// If consumption is higher than max_consumption, attempts to free memory 
by calling

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/service/data-stream-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.cc 
b/be/src/service/data-stream-service.cc
index dcf0c1f..34682d4 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -17,6 +17,9 @@
 
 #include "service/data-stream-service.h"
 
+#include <climits>
+
+#include "common/constant-strings.h"
 #include "common/status.h"
 #include "exec/kudu-util.h"
 #include "kudu/rpc/rpc_context.h"
@@ -24,6 +27,7 @@
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/row-batch.h"
+#include "util/parse-util.h"
 #include "testutil/fault-injection-util.h"
 
 #include "gen-cpp/data_stream_service.pb.h"
@@ -32,10 +36,35 @@
 
 using kudu::rpc::RpcContext;
 
+static const string queue_limit_msg = "(Advanced) Limit on RPC payloads 
consumption for "
+    "DataStreamService. " + Substitute(MEM_UNITS_HELP_MSG, "the process memory 
limit");
+DEFINE_string(datastream_service_queue_mem_limit, "5%", 
queue_limit_msg.c_str());
+DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of threads for 
processing "
+    "datastream services' RPCs. If left at default value 0, it will be set to 
number of "
+    "CPU cores");
+
 namespace impala {
 
-DataStreamService::DataStreamService(RpcMgr* mgr)
-  : DataStreamServiceIf(mgr->metric_entity(), mgr->result_tracker()) {}
+DataStreamService::DataStreamService()
+  : DataStreamServiceIf(ExecEnv::GetInstance()->rpc_mgr()->metric_entity(),
+        ExecEnv::GetInstance()->rpc_mgr()->result_tracker()) {
+  MemTracker* process_mem_tracker = 
ExecEnv::GetInstance()->process_mem_tracker();
+  bool is_percent;
+  int64_t bytes_limit = 
ParseUtil::ParseMemSpec(FLAGS_datastream_service_queue_mem_limit,
+      &is_percent, process_mem_tracker->limit());
+  mem_tracker_.reset(new MemTracker(
+      bytes_limit, "Data Stream Service Queue", process_mem_tracker));
+}
+
+Status DataStreamService::Init() {
+  int num_svc_threads = FLAGS_datastream_service_num_svc_threads > 0 ?
+      FLAGS_datastream_service_num_svc_threads : CpuInfo::num_cores();
+  // The maximum queue length is set to maximum 32-bit value. Its actual 
capacity is
+  // bound by memory consumption against 'mem_tracker_'.
+  
RETURN_IF_ERROR(ExecEnv::GetInstance()->rpc_mgr()->RegisterService(num_svc_threads,
+      std::numeric_limits<int32_t>::max(), this, mem_tracker()));
+  return Status::OK();
+}
 
 void DataStreamService::EndDataStream(const EndDataStreamRequestPB* request,
     EndDataStreamResponsePB* response, RpcContext* rpc_context) {

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/service/data-stream-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.h 
b/be/src/service/data-stream-service.h
index 7f3c6e4..63a0bf7 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -20,6 +20,9 @@
 
 #include "gen-cpp/data_stream_service.service.h"
 
+#include "common/status.h"
+#include "runtime/mem-tracker.h"
+
 namespace kudu {
 namespace rpc {
 class RpcContext;
@@ -37,7 +40,11 @@ class RpcMgr;
 /// appropriate receivers.
 class DataStreamService : public DataStreamServiceIf {
  public:
-  DataStreamService(RpcMgr* rpc_mgr);
+  DataStreamService();
+
+  /// Initializes the service by registering it with the singleton RPC manager.
+  /// This mustn't be called until RPC manager has been initialized.
+  Status Init();
 
   /// Notifies the receiver to close the data stream specified in 'request'.
   /// The receiver replies to the client with a status serialized in 
'response'.
@@ -48,6 +55,12 @@ class DataStreamService : public DataStreamServiceIf {
   /// The receiver replies to the client with a status serialized in 
'response'.
   virtual void TransmitData(const TransmitDataRequestPB* request,
       TransmitDataResponsePB* response, kudu::rpc::RpcContext* context);
+
+  MemTracker* mem_tracker() { return mem_tracker_.get(); }
+
+ private:
+  /// Tracks the memory usage of the payloads in the service queue.
+  std::unique_ptr<MemTracker> mem_tracker_;
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/tests/custom_cluster/test_krpc_mem_usage.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_mem_usage.py 
b/tests/custom_cluster/test_krpc_mem_usage.py
index 07d2757..a145a7a 100644
--- a/tests/custom_cluster/test_krpc_mem_usage.py
+++ b/tests/custom_cluster/test_krpc_mem_usage.py
@@ -22,8 +22,8 @@ from tests.common.impala_cluster import ImpalaCluster
 from tests.common.skip import SkipIf, SkipIfBuildType
 from tests.verifiers.mem_usage_verifier import MemUsageVerifier
 
-DATA_STREAM_MGR_METRIC = "Data Stream Queued RPC Calls"
-DATA_STREAM_SVC_METRIC = "Data Stream Service"
+DATA_STREAM_MGR_METRIC = "Data Stream Manager Deferred RPCs"
+DATA_STREAM_SVC_METRIC = "Data Stream Service Queue"
 ALL_METRICS = [ DATA_STREAM_MGR_METRIC, DATA_STREAM_SVC_METRIC ]
 
 @SkipIf.not_krpc

Reply via email to