IMPALA-4856: Port data stream service to KRPC

This patch implements a new data stream service which utilizes KRPC.
Similar to the thrift RPC implementation, there are 3 major components
to the data stream services: KrpcDataStreamSender serializes and sends
row batches materialized by a fragment instance to a KrpcDataStreamRecvr.
KrpcDataStreamMgr is responsible for routing an incoming row batch to
the appropriate receiver. The data stream service runs on the port
FLAGS_krpc_port which is 29000 by default.

Unlike the implementation with thrift RPC, KRPC provides an asynchronous
interface for invoking remote methods. As a result, KrpcDataStreamSender
doesn't need to create a thread per connection. There is one connection
between two Impalad nodes for each direction (i.e. client and server).
Multiple queries can multi-plex on the same connection for transmitting
row batches between two Impalad nodes. The asynchronous interface also
prevents avoids the possibility that a thread is stuck in the RPC code
for extended amount of time without checking for cancellation. A TransmitData()
call with KRPC is in essence a trio of RpcController, a serialized protobuf
request buffer and a protobuf response buffer. The call is invoked via a
DataStreamService proxy object. The serialized tuple offsets and row batches
are sent via "sidecars" in KRPC to avoid extra copy into the serialized
request buffer.

Each impalad node creates a singleton DataStreamService object at start-up
time. All incoming calls are served by a service thread pool created as part
of DataStreamService. By default, the number of service threads equals the
number of logical cores. The service threads are shared across all queries so
the RPC handler should avoid blocking as much as possible. In thrift RPC
implementation, we make a thrift thread handling a TransmitData() RPC to block
for extended period of time when the receiver is not yet created when the call
arrives. In KRPC implementation, we store TransmitData() or EndDataStream()
requests which arrive before the receiver is ready in a per-receiver early
sender list stored in KrpcDataStreamMgr. These RPC calls will be processed
and responded to when the receiver is created or when timeout occurs.

Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr.
If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit
to exceed, the request will be stashed in a queue for deferred processing.
The stashed RPC requests will not be responded to until they are processed
so as to exert back pressure to the senders. An alternative would be to reply 
with
an error and the request / row batches need to be sent again. This may end up
consuming more network bandwidth than the thrift RPC implementation. This change
adopts the behavior of allowing one stashed request per sender.

All rpc requests and responses are serialized using protobuf. The equivalent of
TRowBatch would be ProtoRowBatch which contains a serialized header about the
meta-data of the row batch and two Kudu Slice objects which contain pointers to
the actual data (i.e. tuple offsets and tuple data).

This patch is based on an abandoned patch by Henry Robinson.

TESTING
-------

* Builds {exhaustive/debug, core/release, asan} passed with FLAGS_use_krpc=true.

TO DO
-----

* Port some BE tests to KRPC services.

Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Reviewed-on: http://gerrit.cloudera.org:8080/8023
Reviewed-by: Michael Ho <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b4ea57a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b4ea57a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b4ea57a7

Branch: refs/heads/master
Commit: b4ea57a7e369c8c0532239db5bb95a701683a150
Parents: a772f84
Author: Michael Ho <[email protected]>
Authored: Sun Aug 20 13:53:34 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Nov 9 20:05:08 2017 +0000

----------------------------------------------------------------------
 be/src/common/status.cc                   |  34 +-
 be/src/common/status.h                    |  12 +
 be/src/exec/data-sink.cc                  |  14 +-
 be/src/exec/exchange-node.cc              |   1 +
 be/src/exec/kudu-util.h                   |  11 +-
 be/src/rpc/CMakeLists.txt                 |   2 +
 be/src/rpc/rpc-mgr.cc                     |   4 +-
 be/src/rpc/rpc-mgr.h                      |   2 +
 be/src/runtime/CMakeLists.txt             |   5 +
 be/src/runtime/data-stream-mgr-base.h     |   8 +-
 be/src/runtime/data-stream-mgr.h          |   2 +-
 be/src/runtime/data-stream-recvr.h        |   2 +-
 be/src/runtime/data-stream-sender.h       |   8 +-
 be/src/runtime/exec-env.cc                |  15 +
 be/src/runtime/exec-env.h                 |   1 +
 be/src/runtime/krpc-data-stream-mgr.cc    | 377 ++++++++++++-
 be/src/runtime/krpc-data-stream-mgr.h     | 447 ++++++++++++++-
 be/src/runtime/krpc-data-stream-recvr.cc  | 592 ++++++++++++++++++-
 be/src/runtime/krpc-data-stream-recvr.h   | 201 ++++++-
 be/src/runtime/krpc-data-stream-sender.cc | 754 +++++++++++++++++++++++++
 be/src/runtime/krpc-data-stream-sender.h  | 187 ++++++
 be/src/runtime/row-batch.cc               | 200 +++++--
 be/src/runtime/row-batch.h                | 129 ++++-
 be/src/service/CMakeLists.txt             |  17 +-
 be/src/service/data-stream-service.cc     |  53 ++
 be/src/service/data-stream-service.h      |  54 ++
 be/src/service/impala-server.cc           |   2 +-
 cmake_modules/FindProtobuf.cmake          |   2 +-
 common/protobuf/CMakeLists.txt            |  26 +-
 common/protobuf/common.proto              |  39 ++
 common/protobuf/data_stream_service.proto |  80 +++
 common/protobuf/row_batch.proto           |  39 ++
 common/thrift/generate_error_codes.py     |   3 +-
 33 files changed, 3147 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/common/status.cc
----------------------------------------------------------------------
diff --git a/be/src/common/status.cc b/be/src/common/status.cc
index a009925..cda3e47 100644
--- a/be/src/common/status.cc
+++ b/be/src/common/status.cc
@@ -23,6 +23,7 @@
 #include "util/debug-util.h"
 
 #include "common/names.h"
+#include "gen-cpp/common.pb.h"
 #include "gen-cpp/ErrorCodes_types.h"
 
 namespace impala {
@@ -145,6 +146,10 @@ Status::Status(const TStatus& status) {
   FromThrift(status);
 }
 
+Status::Status(const StatusPB& status) {
+  FromProto(status);
+}
+
 Status& Status::operator=(const TStatus& status) {
   delete msg_;
   FromThrift(status);
@@ -203,7 +208,7 @@ const string Status::GetDetail() const {
 
 void Status::ToThrift(TStatus* status) const {
   status->error_msgs.clear();
-  if (msg_ == NULL) {
+  if (msg_ == nullptr) {
     status->status_code = TErrorCode::OK;
   } else {
     status->status_code = msg_->error();
@@ -213,6 +218,17 @@ void Status::ToThrift(TStatus* status) const {
   }
 }
 
+void Status::ToProto(StatusPB* status) const {
+  status->Clear();
+  if (msg_ == nullptr) {
+    status->set_status_code(TErrorCode::OK);
+  } else {
+    status->set_status_code(msg_->error());
+    status->add_error_msgs(msg_->msg());
+    for (const string& s : msg_->details()) status->add_error_msgs(s);
+  }
+}
+
 void Status::FromThrift(const TStatus& status) {
   if (status.status_code == TErrorCode::OK) {
     msg_ = NULL;
@@ -229,6 +245,22 @@ void Status::FromThrift(const TStatus& status) {
   }
 }
 
+void Status::FromProto(const StatusPB& status) {
+  if (status.status_code() == TErrorCode::OK) {
+    msg_ = nullptr;
+  } else {
+    msg_ = new ErrorMsg();
+    msg_->SetErrorCode(static_cast<TErrorCode::type>(status.status_code()));
+    if (status.error_msgs().size() > 0) {
+      // The first message is the actual error message. (See 
Status::ToThrift()).
+      msg_->SetErrorMsg(status.error_msgs().Get(0));
+      // The following messages are details.
+      std::for_each(status.error_msgs().begin() + 1, status.error_msgs().end(),
+          [&](string const& detail) { msg_->AddDetail(detail); });
+    }
+  }
+}
+
 void Status::FreeMessage() noexcept {
   delete msg_;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/common/status.h
----------------------------------------------------------------------
diff --git a/be/src/common/status.h b/be/src/common/status.h
index c3b8d68..24dba8b 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -34,6 +34,8 @@
 
 namespace impala {
 
+class StatusPB;
+
 /// Status is used as a function return type to indicate success, failure or 
cancellation
 /// of the function. In case of successful completion, it only occupies 
sizeof(void*)
 /// statically allocated memory and therefore no more members should be added 
to this
@@ -151,6 +153,10 @@ class NODISCARD Status {
   /// Retains the TErrorCode value and the message
   explicit Status(const TStatus& status);
 
+  /// "Copy" c'tor from StatusPB (a protobuf serialized version of Status 
object).
+  /// Retains the TErrorCode value and the message
+  explicit Status(const StatusPB& status);
+
   /// "Copy c'tor from HS2 TStatus.
   /// Retains the TErrorCode value and the message
   explicit Status(const apache::hive::service::cli::thrift::TStatus& 
hs2_status);
@@ -240,6 +246,9 @@ class NODISCARD Status {
   /// Convert into TStatus.
   void ToThrift(TStatus* status) const;
 
+  /// Serialize into StatusPB
+  void ToProto(StatusPB* status) const;
+
   /// Returns the formatted message of the error message and the individual 
details of the
   /// additional messages as a single string. This should only be called 
internally and
   /// not to report an error back to the client.
@@ -265,6 +274,9 @@ class NODISCARD Status {
   /// A non-inline function for unwrapping a TStatus object.
   void FromThrift(const TStatus& status);
 
+  /// A non-inline function for unwrapping a StatusPB object.
+  void FromProto(const StatusPB& status);
+
   /// Status uses a naked pointer to ensure the size of an instance on the 
stack is only
   /// the sizeof(ErrorMsg*). Every Status owns its ErrorMsg instance.
   ErrorMsg* msg_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index a319860..59439cb 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -32,6 +32,7 @@
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/data-stream-sender.h"
+#include "runtime/krpc-data-stream-sender.h"
 #include "runtime/mem-tracker.h"
 #include "util/container-util.h"
 
@@ -60,11 +61,14 @@ Status DataSink::Create(const TPlanFragmentCtx& 
fragment_ctx,
     case TDataSinkType::DATA_STREAM_SINK:
       if (!thrift_sink.__isset.stream_sink) return Status("Missing data stream 
sink.");
 
-      // TODO: Remove DCHECK when KRPC is supported.
-      DCHECK(!FLAGS_use_krpc);
-      // TODO: figure out good buffer size based on size of output row
-      *sink = pool->Add(new DataStreamSender(fragment_instance_ctx.sender_id, 
row_desc,
-          thrift_sink.stream_sink, fragment_ctx.destinations, 16 * 1024));
+      if (FLAGS_use_krpc) {
+        *sink = pool->Add(new 
KrpcDataStreamSender(fragment_instance_ctx.sender_id,
+            row_desc, thrift_sink.stream_sink, fragment_ctx.destinations, 16 * 
1024));
+      } else {
+        // TODO: figure out good buffer size based on size of output row
+        *sink = pool->Add(new 
DataStreamSender(fragment_instance_ctx.sender_id, row_desc,
+            thrift_sink.stream_sink, fragment_ctx.destinations, 16 * 1024));
+      }
       break;
     case TDataSinkType::TABLE_SINK:
       if (!thrift_sink.__isset.table_sink) return Status("Missing table 
sink.");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index b39bcbf..353a59b 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -106,6 +106,7 @@ void ExchangeNode::Codegen(RuntimeState* state) {
 Status ExchangeNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
+  RETURN_IF_CANCELLED(state);
   if (is_merging_) {
     // CreateMerger() will populate its merging heap with batches from the 
stream_recvr_,
     // so it is not necessary to call FillInputRowBatch().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/exec/kudu-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h
index 5de55fe..11cf16a 100644
--- a/be/src/exec/kudu-util.h
+++ b/be/src/exec/kudu-util.h
@@ -39,11 +39,10 @@ namespace impala {
     } \
   } while (0)
 
-
 #define KUDU_ASSERT_OK(status)                                     \
   do {                                                             \
-    const Status& status_ = FromKuduStatus(status);                \
-    ASSERT_TRUE(status_.ok()) << "Error: " << status_.GetDetail(); \
+    const Status& _s = FromKuduStatus(status);                     \
+    ASSERT_TRUE(_s.ok()) << "Error: " << _s.GetDetail();           \
   } while (0)
 
 class TimestampValue;
@@ -97,8 +96,10 @@ ColumnType 
KuduDataTypeToColumnType(kudu::client::KuduColumnSchema::DataType typ
 inline Status FromKuduStatus(
     const kudu::Status& k_status, const std::string prepend = "") {
   if (LIKELY(k_status.ok())) return Status::OK();
-  if (prepend.empty()) return Status(k_status.ToString());
-  return Status(strings::Substitute("$0: $1", prepend, k_status.ToString()));
+  const string& err_msg = prepend.empty() ? k_status.ToString() :
+      strings::Substitute("$0: $1", prepend, k_status.ToString());
+  VLOG(1) << err_msg;
+  return Status::Expected(err_msg);
 }
 
 } /// namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt
index 326b806..1985c6c 100644
--- a/be/src/rpc/CMakeLists.txt
+++ b/be/src/rpc/CMakeLists.txt
@@ -22,10 +22,12 @@ set(LIBRARY_OUTPUT_PATH 
"${BUILD_OUTPUT_ROOT_DIRECTORY}/rpc")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/rpc")
 
 # Mark the protobuf files as generated
+set_source_files_properties(${COMMON_PROTO_SRCS} PROPERTIES GENERATED TRUE)
 set_source_files_properties(${RPC_TEST_PROTO_SRCS} PROPERTIES GENERATED TRUE)
 
 add_library(Rpc
   authentication.cc
+  ${COMMON_PROTO_SRCS}
   rpc-mgr.cc
   rpc-trace.cc
   TAcceptQueueServer.cpp

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index f6491be..6a9d2d4 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -44,6 +44,8 @@ DEFINE_int32(num_acceptor_threads, 2,
 DEFINE_int32(num_reactor_threads, 0,
     "Number of threads dedicated to managing network IO for RPC services. If 
left at "
     "default value 0, it will be set to number of CPU cores.");
+DEFINE_int32(rpc_retry_interval_ms, 5,
+    "Time in millisecond of waiting before retrying an RPC when remote is 
busy");
 
 namespace impala {
 
@@ -72,7 +74,7 @@ Status RpcMgr::RegisterService(int32_t num_service_threads, 
int32_t service_queu
       messenger_->RegisterService(service_pool->service_name(), service_pool),
       "Could not register service");
   service_pools_.push_back(service_pool);
-
+  VLOG_QUERY << "Registered KRPC service: " << service_pool->service_name();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index d414bb6..85f88b6 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -149,6 +149,8 @@ class RpcMgr {
     return messenger_->metric_entity();
   }
 
+  std::shared_ptr<kudu::rpc::Messenger> messenger() { return messenger_; }
+
   ~RpcMgr() {
     DCHECK_EQ(service_pools_.size(), 0)
         << "Must call Shutdown() before destroying RpcMgr";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 4d95dda..41805af 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -23,6 +23,9 @@ set(LIBRARY_OUTPUT_PATH 
"${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
 # where to put generated binaries
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
 
+# Mark the protobuf file as generated
+set_source_files_properties(${ROW_BATCH_PROTO_SRCS} PROPERTIES GENERATED TRUE)
+
 add_library(Runtime
   buffered-tuple-stream.cc
   client-cache.cc
@@ -45,6 +48,7 @@ add_library(Runtime
   initial-reservations.cc
   krpc-data-stream-mgr.cc
   krpc-data-stream-recvr.cc
+  krpc-data-stream-sender.cc
   lib-cache.cc
   mem-tracker.cc
   mem-pool.cc
@@ -56,6 +60,7 @@ add_library(Runtime
   raw-value.cc
   raw-value-ir.cc
   row-batch.cc
+  ${ROW_BATCH_PROTO_SRCS}
   runtime-filter.cc
   runtime-filter-bank.cc
   runtime-filter-ir.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/data-stream-mgr-base.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr-base.h 
b/be/src/runtime/data-stream-mgr-base.h
index 7886cfa..0e392e3 100644
--- a/be/src/runtime/data-stream-mgr-base.h
+++ b/be/src/runtime/data-stream-mgr-base.h
@@ -21,6 +21,7 @@
 
 #include "common/status.h"
 #include "runtime/descriptors.h"  // for PlanNodeId
+#include "util/aligned-new.h"
 
 namespace impala {
 
@@ -35,7 +36,7 @@ class TUniqueId;
 /// TODO: This is a temporary pure virtual base class that defines the basic 
interface for
 /// 2 parallel implementations of the DataStreamMgrBase, one each for Thrift 
and KRPC.
 /// Remove this in favor of the KRPC implementation when possible.
-class DataStreamMgrBase {
+class DataStreamMgrBase : public CacheLineAligned {
  public:
   DataStreamMgrBase() {}
 
@@ -47,11 +48,6 @@ class DataStreamMgrBase {
       PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
       RuntimeProfile* profile, bool is_merging) = 0;
 
-  /// Notifies the recvr associated with the fragment/node id that the 
specified
-  /// sender has closed.
-  virtual Status CloseSender(const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int sender_id) = 0;
-
   /// Closes all receivers registered for fragment_instance_id immediately.
   virtual void Cancel(const TUniqueId& fragment_instance_id) = 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
index 1e9f4b6..07f7c56 100644
--- a/be/src/runtime/data-stream-mgr.h
+++ b/be/src/runtime/data-stream-mgr.h
@@ -95,7 +95,7 @@ class DataStreamMgr : public DataStreamMgrBase {
   /// sender has closed.
   /// Returns OK if successful, error status otherwise.
   Status CloseSender(const TUniqueId& fragment_instance_id, PlanNodeId 
dest_node_id,
-      int sender_id) override;
+      int sender_id);
 
   /// Closes all receivers registered for fragment_instance_id immediately.
   void Cancel(const TUniqueId& fragment_instance_id) override;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.h 
b/be/src/runtime/data-stream-recvr.h
index 9545f82..37e8f70 100644
--- a/be/src/runtime/data-stream-recvr.h
+++ b/be/src/runtime/data-stream-recvr.h
@@ -134,7 +134,7 @@ class DataStreamRecvr : public DataStreamRecvrBase {
   /// soft upper limit on the total amount of buffering allowed for this 
stream across
   /// all sender queues. we stop acking incoming data once the amount of 
buffered data
   /// exceeds this value
-  int total_buffer_limit_;
+  int64_t total_buffer_limit_;
 
   /// Row schema.
   const RowDescriptor* row_desc_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.h 
b/be/src/runtime/data-stream-sender.h
index 8ed4bd0..598f3b9 100644
--- a/be/src/runtime/data-stream-sender.h
+++ b/be/src/runtime/data-stream-sender.h
@@ -95,16 +95,16 @@ class DataStreamSender : public DataSink {
   /// used to maintain metrics.
   Status SerializeBatch(RowBatch* src, TRowBatch* dest, int num_receivers = 1);
 
-  /// Return total number of bytes sent in TRowBatch.data. If batches are
-  /// broadcast to multiple receivers, they are counted once per receiver.
-  int64_t GetNumDataBytesSent() const;
-
  protected:
   friend class DataStreamTest;
 
   virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
       const TDataSink& tsink, RuntimeState* state);
 
+  /// Return total number of bytes sent in TRowBatch.data. If batches are
+  /// broadcast to multiple receivers, they are counted once per receiver.
+  int64_t GetNumDataBytesSent() const;
+
  private:
   class Channel;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 20e921b..0ceb636 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -28,6 +28,7 @@
 #include "common/object-pool.h"
 #include "exec/kudu-util.h"
 #include "gen-cpp/ImpalaInternalService.h"
+#include "kudu/rpc/service_if.h"
 #include "rpc/rpc-mgr.h"
 #include "runtime/backend-client.h"
 #include "runtime/bufferpool/buffer-pool.h"
@@ -47,6 +48,7 @@
 #include "scheduling/admission-controller.h"
 #include "scheduling/request-pool-service.h"
 #include "scheduling/scheduler.h"
+#include "service/data-stream-service.h"
 #include "service/frontend.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/debug-util.h"
@@ -64,6 +66,7 @@
 #include "common/names.h"
 
 using boost::algorithm::join;
+using kudu::rpc::ServiceIf;
 using namespace strings;
 
 DEFINE_bool_hidden(use_statestore, true, "Deprecated, do not use");
@@ -81,6 +84,11 @@ DEFINE_bool_hidden(use_krpc, false, "Used to indicate 
whether to use KRPC for th
     "DataStream subsystem, or the Thrift RPC layer instead. Defaults to false. 
"
     "KRPC not yet supported");
 
+DEFINE_int32(datastream_service_queue_depth, 1024, "Size of datastream service 
queue");
+DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of datastream 
service "
+    "processing threads. If left at default value 0, it will be set to number 
of CPU "
+    "cores.");
+
 DECLARE_int32(state_store_port);
 DECLARE_int32(num_threads_per_core);
 DECLARE_int32(num_cores);
@@ -168,6 +176,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, 
int krpc_port,
     backend_address_(MakeNetworkAddress(hostname, backend_port)) {
 
   if (FLAGS_use_krpc) {
+    VLOG_QUERY << "Using KRPC.";
     // KRPC relies on resolved IP address. It's set in StartServices().
     krpc_address_.__set_port(krpc_port);
     rpc_mgr_.reset(new RpcMgr());
@@ -293,7 +302,13 @@ Status ExecEnv::Init() {
   // Initialize the RPCMgr before allowing services registration.
   if (FLAGS_use_krpc) {
     krpc_address_.__set_hostname(ip_address_);
+    RETURN_IF_ERROR(KrpcStreamMgr()->Init());
     RETURN_IF_ERROR(rpc_mgr_->Init());
+    unique_ptr<ServiceIf> data_svc(new DataStreamService(rpc_mgr_.get()));
+    int num_svc_threads = FLAGS_datastream_service_num_svc_threads > 0 ?
+        FLAGS_datastream_service_num_svc_threads : CpuInfo::num_cores();
+    RETURN_IF_ERROR(rpc_mgr_->RegisterService(num_svc_threads,
+        FLAGS_datastream_service_queue_depth, move(data_svc)));
   }
 
   mem_tracker_.reset(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 416f855..df0d926 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -125,6 +125,7 @@ class ExecEnv {
   RequestPoolService* request_pool_service() { return 
request_pool_service_.get(); }
   CallableThreadPool* rpc_pool() { return async_rpc_pool_.get(); }
   QueryExecMgr* query_exec_mgr() { return query_exec_mgr_.get(); }
+  RpcMgr* rpc_mgr() const { return rpc_mgr_.get(); }
   PoolMemTrackerRegistry* pool_mem_trackers() { return 
pool_mem_trackers_.get(); }
   ReservationTracker* buffer_reservation() { return buffer_reservation_.get(); 
}
   BufferPool* buffer_pool() { return buffer_pool_.get(); }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc 
b/be/src/runtime/krpc-data-stream-mgr.cc
index a3ed417..b70bca6 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -17,41 +17,376 @@
 
 #include "runtime/krpc-data-stream-mgr.h"
 
-#include "common/logging.h"
+#include <iostream>
+#include <boost/functional/hash.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/thread.hpp>
+
+#include "kudu/rpc/rpc_context.h"
+
+#include "runtime/krpc-data-stream-recvr.h"
+#include "runtime/raw-value.inline.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "util/debug-util.h"
+#include "util/periodic-counter-updater.h"
+#include "util/runtime-profile-counters.h"
+#include "util/uid-util.h"
+
+#include "gen-cpp/data_stream_service.pb.h"
+
+#include "common/names.h"
+
+/// This parameter controls the minimum amount of time a closed stream ID will 
stay in
+/// closed_stream_cache_ before it is evicted. It needs to be set sufficiently 
high that
+/// it will outlive all the calls to FindRecvr() for that stream ID, to 
distinguish
+/// between was-here-but-now-gone and never-here states for the receiver. If 
the stream
+/// ID expires before a call to FindRecvr(), the sender will see an error 
which will lead
+/// to query cancellation. Setting this value higher will increase the size of 
the stream
+/// cache (which is roughly 48 bytes per receiver).
+/// TODO: We don't need millisecond precision here.
+const int32_t STREAM_EXPIRATION_TIME_MS = 300 * 1000;
 
 DECLARE_bool(use_krpc);
+DECLARE_int32(datastream_sender_timeout_ms);
+DEFINE_int32(datastream_service_num_deserialization_threads, 16,
+    "Number of threads for deserializing RPC requests deferred due to the 
receiver "
+    "not ready or the soft limit of the receiver is reached.");
+
+using boost::mutex;
 
 namespace impala {
 
-[[noreturn]] static void AbortUnsupportedFeature() {
-  // We should have gotten here only if the FLAGS_use_krpc is set to true.
-  CHECK(FLAGS_use_krpc) << "Shouldn't reach here unless startup flag 
'use_krpc' "
-      "is true.";
-  // KRPC isn't supported yet, so abort.
-  ABORT_WITH_ERROR("KRPC is not supported yet. Please set the 'use_krpc' flag 
to "
-      "false and restart the cluster.");
+KrpcDataStreamMgr::KrpcDataStreamMgr(MetricGroup* metrics)
+  : deserialize_pool_("data-stream-mgr", "deserialize",
+      FLAGS_datastream_service_num_deserialization_threads, 10000,
+      boost::bind(&KrpcDataStreamMgr::DeserializeThreadFn, this, _1, _2)) {
+  MetricGroup* dsm_metrics = 
metrics->GetOrCreateChildGroup("datastream-manager");
+  num_senders_waiting_ =
+      dsm_metrics->AddGauge<int64_t>("senders-blocked-on-recvr-creation", 0L);
+  total_senders_waited_ =
+      
dsm_metrics->AddCounter<int64_t>("total-senders-blocked-on-recvr-creation", 0L);
+  num_senders_timedout_ = dsm_metrics->AddCounter<int64_t>(
+      "total-senders-timedout-waiting-for-recvr-creation", 0L);
+}
+
+Status KrpcDataStreamMgr::Init() {
+  RETURN_IF_ERROR(Thread::Create("krpc-data-stream-mgr", "maintenance",
+      [this](){ this->Maintenance(); }, &maintenance_thread_));
+  RETURN_IF_ERROR(deserialize_pool_.Init());
+  return Status::OK();
+}
+
+inline uint32_t KrpcDataStreamMgr::GetHashValue(
+    const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id) {
+  uint32_t value = RawValue::GetHashValue(&fragment_instance_id.lo, 
TYPE_BIGINT, 0);
+  value = RawValue::GetHashValue(&fragment_instance_id.hi, TYPE_BIGINT, value);
+  value = RawValue::GetHashValue(&dest_node_id, TYPE_INT, value);
+  return value;
+}
+
+shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
+    RuntimeState* state, const RowDescriptor* row_desc,
+    const TUniqueId& finst_id, PlanNodeId dest_node_id, int num_senders,
+    int64_t buffer_size, RuntimeProfile* profile, bool is_merging) {
+
+  DCHECK(profile != nullptr);
+  VLOG_FILE << "creating receiver for fragment="<< finst_id
+            << ", node=" << dest_node_id;
+  shared_ptr<KrpcDataStreamRecvr> recvr(
+      new KrpcDataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
+          finst_id, dest_node_id, num_senders, is_merging, buffer_size, 
profile));
+  uint32_t hash_value = GetHashValue(finst_id, dest_node_id);
+  {
+    RecvrId recvr_id = make_pair(finst_id, dest_node_id);
+    lock_guard<mutex> l(lock_);
+    fragment_recvr_set_.insert(recvr_id);
+    receiver_map_.insert(make_pair(hash_value, recvr));
+
+    EarlySendersMap::iterator it = early_senders_map_.find(recvr_id);
+    if (it != early_senders_map_.end()) {
+      EarlySendersList& early_senders = it->second;
+      // Let the receiver take over the RPC payloads of early senders and 
process them
+      // asynchronously.
+      for (unique_ptr<TransmitDataCtx>& ctx : 
early_senders.waiting_sender_ctxs) {
+        recvr->TakeOverEarlySender(move(ctx));
+        num_senders_waiting_->Increment(-1);
+      }
+      for (const unique_ptr<EndDataStreamCtx>& ctx : 
early_senders.closed_sender_ctxs) {
+        recvr->RemoveSender(ctx->request->sender_id());
+        Status::OK().ToProto(ctx->response->mutable_status());
+        ctx->rpc_context->RespondSuccess();
+        num_senders_waiting_->Increment(-1);
+      }
+      early_senders_map_.erase(it);
+    }
+  }
+  return recvr;
+}
+
+shared_ptr<KrpcDataStreamRecvr> KrpcDataStreamMgr::FindRecvr(
+    const TUniqueId& finst_id, PlanNodeId dest_node_id, bool* 
already_unregistered) {
+  VLOG_ROW << "looking up fragment_instance_id=" << finst_id
+           << ", node=" << dest_node_id;
+  *already_unregistered = false;
+  uint32_t hash_value = GetHashValue(finst_id, dest_node_id);
+  pair<RecvrMap::iterator, RecvrMap::iterator> range =
+      receiver_map_.equal_range(hash_value);
+  while (range.first != range.second) {
+    shared_ptr<KrpcDataStreamRecvr> recvr = range.first->second;
+    if (recvr->fragment_instance_id() == finst_id &&
+        recvr->dest_node_id() == dest_node_id) {
+      return recvr;
+    }
+    ++range.first;
+  }
+  RecvrId recvr_id = make_pair(finst_id, dest_node_id);
+  if (closed_stream_cache_.find(recvr_id) != closed_stream_cache_.end()) {
+    *already_unregistered = true;
+  }
+  return shared_ptr<KrpcDataStreamRecvr>();
+}
+
+void KrpcDataStreamMgr::AddEarlySender(const TUniqueId& finst_id,
+    const TransmitDataRequestPB* request, TransmitDataResponsePB* response,
+    kudu::rpc::RpcContext* rpc_context) {
+  RecvrId recvr_id = make_pair(finst_id, request->dest_node_id());
+  auto payload = make_unique<TransmitDataCtx>(request, response, rpc_context);
+  early_senders_map_[recvr_id].waiting_sender_ctxs.emplace_back(move(payload));
+  num_senders_waiting_->Increment(1);
+  total_senders_waited_->Increment(1);
 }
 
-[[noreturn]] KrpcDataStreamMgr::KrpcDataStreamMgr(MetricGroup* metrics) {
-  AbortUnsupportedFeature();
+void KrpcDataStreamMgr::AddEarlyClosedSender(const TUniqueId& finst_id,
+    const EndDataStreamRequestPB* request, EndDataStreamResponsePB* response,
+    kudu::rpc::RpcContext* rpc_context) {
+  RecvrId recvr_id = make_pair(finst_id, request->dest_node_id());
+  auto payload = make_unique<EndDataStreamCtx>(request, response, rpc_context);
+  early_senders_map_[recvr_id].closed_sender_ctxs.emplace_back(move(payload));
+  num_senders_waiting_->Increment(1);
+  total_senders_waited_->Increment(1);
 }
 
-KrpcDataStreamMgr::~KrpcDataStreamMgr(){}
+void KrpcDataStreamMgr::AddData(const TransmitDataRequestPB* request,
+    TransmitDataResponsePB* response, kudu::rpc::RpcContext* rpc_context) {
+  TUniqueId finst_id;
+  finst_id.__set_lo(request->dest_fragment_instance_id().lo());
+  finst_id.__set_hi(request->dest_fragment_instance_id().hi());
+  TPlanNodeId dest_node_id = request->dest_node_id();
+  VLOG_ROW << "AddData(): finst_id=" << PrintId(finst_id)
+           << " node_id=" << request->dest_node_id()
+           << " #rows=" << request->row_batch_header().num_rows()
+           << " sender_id=" << request->sender_id();
+  bool already_unregistered = false;
+  shared_ptr<KrpcDataStreamRecvr> recvr;
+  {
+    lock_guard<mutex> l(lock_);
+    recvr = FindRecvr(finst_id, request->dest_node_id(), 
&already_unregistered);
+    // If no receiver is found and it's not in the closed stream cache, best 
guess is
+    // that it is still preparing, so add payload to per-receiver early 
senders' list.
+    // If the receiver doesn't show up after 
FLAGS_datastream_sender_timeout_ms ms
+    // (e.g. if the receiver was closed and has already been retired from the
+    // closed_stream_cache_), the sender is timed out by the maintenance 
thread.
+    if (!already_unregistered && recvr == nullptr) {
+      AddEarlySender(finst_id, request, response, rpc_context);
+      return;
+    }
+  }
+  if (already_unregistered) {
+    // The receiver may remove itself from the receiver map via 
DeregisterRecvr() at any
+    // time without considering the remaining number of senders. As a 
consequence,
+    // FindRecvr() may return nullptr even though the receiver was once 
present. We
+    // detect this case by checking already_unregistered - if true then the 
receiver was
+    // already closed deliberately, and there's no unexpected error here.
+    Status(TErrorCode::DATASTREAM_RECVR_CLOSED, PrintId(finst_id), 
dest_node_id)
+        .ToProto(response->mutable_status());
+    rpc_context->RespondSuccess();
+    return;
+  }
+  DCHECK(recvr != nullptr);
+  recvr->AddBatch(request, response, rpc_context);
+}
 
-    [[noreturn]] std::shared_ptr<DataStreamRecvrBase> 
KrpcDataStreamMgr::CreateRecvr(
-        RuntimeState* state, const RowDescriptor* row_desc,
-        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
num_senders,
-        int64_t buffer_size, RuntimeProfile* profile, bool is_merging) {
-  AbortUnsupportedFeature();
+void KrpcDataStreamMgr::EnqueueDeserializeTask(const TUniqueId& finst_id,
+    PlanNodeId dest_node_id, int sender_id, int num_requests) {
+  for (int i = 0; i < num_requests; ++i) {
+    DeserializeTask payload = {finst_id, dest_node_id, sender_id};
+    deserialize_pool_.Offer(move(payload));
+  }
 }
 
-[[noreturn]] Status KrpcDataStreamMgr::CloseSender(const TUniqueId& 
fragment_instance_id,
-    PlanNodeId dest_node_id, int sender_id) {
-  AbortUnsupportedFeature();
+void KrpcDataStreamMgr::DeserializeThreadFn(int thread_id, const 
DeserializeTask& task) {
+  shared_ptr<KrpcDataStreamRecvr> recvr;
+  {
+    bool already_unregistered;
+    lock_guard<mutex> l(lock_);
+    recvr = FindRecvr(task.finst_id, task.dest_node_id, &already_unregistered);
+    DCHECK(recvr != nullptr || already_unregistered);
+  }
+  if (recvr != nullptr) recvr->DequeueDeferredRpc(task.sender_id);
+}
+
+void KrpcDataStreamMgr::CloseSender(const EndDataStreamRequestPB* request,
+    EndDataStreamResponsePB* response, kudu::rpc::RpcContext* rpc_context) {
+  TUniqueId finst_id;
+  finst_id.__set_lo(request->dest_fragment_instance_id().lo());
+  finst_id.__set_hi(request->dest_fragment_instance_id().hi());
+  VLOG_ROW << "CloseSender(): instance_id=" << PrintId(finst_id)
+           << " node_id=" << request->dest_node_id()
+           << " sender_id=" << request->sender_id();
+  shared_ptr<KrpcDataStreamRecvr> recvr;
+  {
+    lock_guard<mutex> l(lock_);
+    bool already_unregistered;
+    recvr = FindRecvr(finst_id, request->dest_node_id(), 
&already_unregistered);
+    // If no receiver is found and it's not in the closed stream cache, we 
still need
+    // to make sure that the close operation is performed so add to per-recvr 
list of
+    // pending closes. It's possible for a sender to issue EOS RPC without 
sending any
+    // rows if no rows are materialized at all in the sender side.
+    if (!already_unregistered && recvr == nullptr) {
+      AddEarlyClosedSender(finst_id, request, response, rpc_context);
+      return;
+    }
+  }
+
+  // If we reach this point, either the receiver is found or it has been 
unregistered
+  // already. In either cases, it's safe to just return an OK status.
+  if (LIKELY(recvr != nullptr)) recvr->RemoveSender(request->sender_id());
+  Status::OK().ToProto(response->mutable_status());
+  rpc_context->RespondSuccess();
+
+  {
+    // TODO: Move this to maintenance thread.
+    // Remove any closed streams that have been in the cache for more than
+    // STREAM_EXPIRATION_TIME_MS.
+    lock_guard<mutex> l(lock_);
+    ClosedStreamMap::iterator it = closed_stream_expirations_.begin();
+    int64_t now = MonotonicMillis();
+    int32_t before = closed_stream_cache_.size();
+    while (it != closed_stream_expirations_.end() && it->first < now) {
+      closed_stream_cache_.erase(it->second);
+      closed_stream_expirations_.erase(it++);
+    }
+    DCHECK_EQ(closed_stream_cache_.size(), closed_stream_expirations_.size());
+    int32_t after = closed_stream_cache_.size();
+    if (before != after) {
+      VLOG_QUERY << "Reduced stream ID cache from " << before << " items, to " 
<< after
+                 << ", eviction took: "
+                 << PrettyPrinter::Print(MonotonicMillis() - now, 
TUnit::TIME_MS);
+    }
+  }
+}
+
+Status KrpcDataStreamMgr::DeregisterRecvr(
+    const TUniqueId& finst_id, PlanNodeId dest_node_id) {
+  VLOG_QUERY << "DeregisterRecvr(): fragment_instance_id=" << finst_id
+             << ", node=" << dest_node_id;
+  uint32_t hash_value = GetHashValue(finst_id, dest_node_id);
+  lock_guard<mutex> l(lock_);
+  pair<RecvrMap::iterator, RecvrMap::iterator> range =
+      receiver_map_.equal_range(hash_value);
+  while (range.first != range.second) {
+    const shared_ptr<KrpcDataStreamRecvr>& recvr = range.first->second;
+    if (recvr->fragment_instance_id() == finst_id &&
+        recvr->dest_node_id() == dest_node_id) {
+      // Notify concurrent AddData() requests that the stream has been 
terminated.
+      recvr->CancelStream();
+      RecvrId recvr_id =
+          make_pair(recvr->fragment_instance_id(), recvr->dest_node_id());
+      fragment_recvr_set_.erase(recvr_id);
+      receiver_map_.erase(range.first);
+      closed_stream_expirations_.insert(
+          make_pair(MonotonicMillis() + STREAM_EXPIRATION_TIME_MS, recvr_id));
+      closed_stream_cache_.insert(recvr_id);
+      return Status::OK();
+    }
+    ++range.first;
+  }
+
+  const string msg = Substitute(
+      "Unknown row receiver id: fragment_instance_id=$0, dest_node_id=$1",
+      PrintId(finst_id), dest_node_id);
+  return Status(msg);
+}
+
+void KrpcDataStreamMgr::Cancel(const TUniqueId& finst_id) {
+  VLOG_QUERY << "cancelling all streams for fragment=" << finst_id;
+  lock_guard<mutex> l(lock_);
+  FragmentRecvrSet::iterator iter =
+      fragment_recvr_set_.lower_bound(make_pair(finst_id, 0));
+  while (iter != fragment_recvr_set_.end() && iter->first == finst_id) {
+    bool unused;
+    shared_ptr<KrpcDataStreamRecvr> recvr = FindRecvr(iter->first, 
iter->second, &unused);
+    if (recvr != nullptr) {
+      recvr->CancelStream();
+    } else {
+      // keep going but at least log it
+      LOG(ERROR) << Substitute("Cancel(): missing in stream_map: fragment=$0 
node=$1",
+          PrintId(iter->first), iter->second);
+    }
+    ++iter;
+  }
+}
+
+template<typename ContextType, typename RequestPBType>
+void KrpcDataStreamMgr::RespondToTimedOutSender(const 
std::unique_ptr<ContextType>& ctx) {
+  const RequestPBType* request = ctx->request;
+  TUniqueId finst_id;
+  finst_id.__set_lo(request->dest_fragment_instance_id().lo());
+  finst_id.__set_hi(request->dest_fragment_instance_id().hi());
+
+  Status(TErrorCode::DATASTREAM_SENDER_TIMEOUT, PrintId(finst_id)).ToProto(
+      ctx->response->mutable_status());
+  ctx->rpc_context->RespondSuccess();
+  num_senders_waiting_->Increment(-1);
+  num_senders_timedout_->Increment(1);
+}
+
+void KrpcDataStreamMgr::Maintenance() {
+  while (true) {
+    // Notify any senders that have been waiting too long for their receiver to
+    // appear. Keep lock_ held for only a short amount of time.
+    vector<EarlySendersList> timed_out_senders;
+    {
+      int64_t now = MonotonicMillis();
+      lock_guard<mutex> l(lock_);
+      auto it = early_senders_map_.begin();
+      while (it != early_senders_map_.end()) {
+        if (now - it->second.arrival_time > 
FLAGS_datastream_sender_timeout_ms) {
+          timed_out_senders.emplace_back(move(it->second));
+          it = early_senders_map_.erase(it);
+        } else {
+          ++it;
+        }
+      }
+    }
+
+    // Send responses to all timed-out senders. We need to propagate the 
time-out errors
+    // to senders which sent EOS RPC so all query fragments will eventually be 
cancelled.
+    // Otherwise, the receiver may hang when it eventually gets created as the 
timed-out
+    // EOS will be lost forever.
+    for (const EarlySendersList& senders_queue : timed_out_senders) {
+      for (const unique_ptr<TransmitDataCtx>& ctx : 
senders_queue.waiting_sender_ctxs) {
+        RespondToTimedOutSender<TransmitDataCtx, TransmitDataRequestPB>(ctx);
+      }
+      for (const unique_ptr<EndDataStreamCtx>& ctx : 
senders_queue.closed_sender_ctxs) {
+        RespondToTimedOutSender<EndDataStreamCtx, EndDataStreamRequestPB>(ctx);
+      }
+    }
+    bool timed_out = false;
+    // Wait for 10s
+    shutdown_promise_.Get(10000, &timed_out);
+    if (!timed_out) return;
+  }
 }
 
-[[noreturn]] void KrpcDataStreamMgr::Cancel(const TUniqueId& 
fragment_instance_id) {
-  AbortUnsupportedFeature();
+KrpcDataStreamMgr::~KrpcDataStreamMgr() {
+  shutdown_promise_.Set(true);
+  deserialize_pool_.Shutdown();
+  LOG(INFO) << "Waiting for data-stream-mgr maintenance thread...";
+  maintenance_thread_->Join();
+  LOG(INFO) << "Waiting for deserialization thread pool...";
+  deserialize_pool_.Join();
 }
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h 
b/be/src/runtime/krpc-data-stream-mgr.h
index ef9bb45..5171e80 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -20,33 +20,458 @@
 
 #include "runtime/data-stream-mgr-base.h"
 
+#include <list>
+#include <queue>
+#include <set>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition_variable.hpp>
+#include <boost/unordered_map.hpp>
+#include <boost/unordered_set.hpp>
+
 #include "common/status.h"
+#include "common/object-pool.h"
+#include "runtime/data-stream-mgr-base.h"
 #include "runtime/descriptors.h"  // for PlanNodeId
+#include "runtime/row-batch.h"
+#include "util/metrics.h"
+#include "util/promise.h"
+#include "util/runtime-profile.h"
+#include "util/thread-pool.h"
+#include "gen-cpp/Types_types.h"  // for TUniqueId
+
+#include "gutil/macros.h"
+
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
 
 namespace impala {
 
-class DataStreamRecvrBase;
-class MetricGroup;
-class RuntimeProfile;
+class DescriptorTbl;
+class EndDataStreamRequestPB;
+class EndDataStreamResponsePB;
+class KrpcDataStreamRecvr;
 class RuntimeState;
-class TRowBatch;
+class TransmitDataRequestPB;
+class TransmitDataResponsePB;
+
+/// TRANSMIT DATA PROTOCOL
+/// ----------------------
+///
+/// Impala daemons send tuple data between themselves using a transmission 
protocol that
+/// is managed by DataStreamMgr and related classes. Batches of tuples are 
sent between
+/// fragment instances using the TransmitData() RPC; The data transmitted are 
usually sent
+/// in batches across multiple RPCs. The logical connection between a pair of 
client and
+/// server is known as a 'channel'. Clients and servers are referred to as 
'senders' and
+/// 'receivers' and are implemented by DataStreamSender and DataStreamRecvr 
respectively.
+/// Please note that the number of senders and number of receivers in a stream 
aren't
+/// necessarily the same. We refer to the on-going transmissions between m 
senders and n
+/// receivers as an 'm:n data stream'.
+///
+/// DataStreamMgr is a singleton class that lives for a long as the Impala 
process, and
+/// manages all streams for all queries. DataStreamRecvr and DataStreamSender 
have
+/// lifetimes tied to their containing fragment instances.
+///
+/// The protocol proceeds in three phases.
+///
+/// Phase 1: Channel establishment
+/// ------------------------------
+///
+/// In the first phase the sender initiates a channel with a receiver by 
sending its
+/// first batch. Since the sender may start sending before the receiver is 
ready, the data
+/// stream manager waits until the receiver has finished initialization and 
then passes
+/// the sender's request to the receiver. If the receiver does not appear 
within a
+/// configurable timeout, the data stream manager notifies the sender directly 
by
+/// returning DATASTREAM_SENDER_TIMEOUT. Note that a sender may have multiple 
channels,
+/// each of which needs to be initialized with the corresponding receiver.
+///
+/// The sender does not distinguish this phase from the steady-state data 
transmission
+/// phase, so may time-out etc. as described below.
+///
+/// Phase 2: Data transmission
+/// --------------------------
+///
+/// After the first batch has been received, a sender continues to send 
batches, one at
+/// a time (so only one TransmitData() RPC per sender is pending completion at 
any one
+/// time). The rate of transmission is controlled by the receiver: a sender 
will only
+/// schedule batch transmission when the previous transmission completes 
successfully.
+/// When a batch is received, a receiver will do one of two things: (1) 
deserializes it
+/// immediately and adds it to 'batch queue' or (2) defers the deserialization 
and respond
+/// to the RPC later if the batch queue is full. In the second case, the RPC 
state is
+/// saved into the receiver's 'deferred_rpcs_' queue. When space becomes 
available in the
+/// batch queue, the longest-waiting RPC is removed from the 'deferred_rpcs_' 
queue and
+/// the row batch is deserialized. In both cases, the RPC is replied to when 
the batch
+/// has been deserialized and added to the batch queue. The sender will then 
send its
+/// next batch.
+///
+/// Phase 3: End of stream
+/// ----------------------
+///
+/// When the stream is terminated, clients will send EndDataStream() RPCs to 
the servers.
+/// This RPC will not be sent until after the final TransmitData() RPC has 
completed and
+/// the stream's contents has been delivered. After EndDataStream() is 
received, no more
+/// TransmitData() RPCs should be expected from this sender.
+///
+/// Exceptional conditions: cancellation, timeouts, failure
+/// -------------------------------------------------------
+///
+/// The protocol must deal with the following complications: asynchronous 
cancellation of
+/// either the receiver or sender, timeouts during RPC transmission, and 
failure of either
+/// the receiver or sender.
+///
+/// 1. Cancellation
+///
+/// If the receiver is cancelled (or closed for any other reason, like 
reaching a limit)
+/// before the sender has completed the stream it will be torn down 
immediately. Any
+/// incomplete senders may not be aware of this, and will continue to send 
batches. The
+/// data stream manager on the receiver keeps a record of recently completed 
receivers so
+/// that it may intercept the 'late' data transmissions and immediately reject 
them with
+/// an error that signals the sender should terminate. The record is removed 
after a
+/// certain period of time.
+///
+/// It's possible for the closed receiver record to be removed before all 
senders have
+/// completed. It is usual that the coordinator will initiate cancellation 
(e.g. the
+/// query is unregistered after initial result rows are fetched once the limit 
is hit).
+/// before the timeout period expires so the sender will be cancelled already. 
However,
+/// it can also occur that the query may not complete before the timeout has 
elapsed.
+/// A sender which sends a row batch after the timeout has elapsed may hit 
time-out and
+/// fail the query. This problem is being tracked in IMPALA-3990.
+///
+/// The sender RPCs are sent asynchronously to the main thread of fragment 
instance
+/// execution. Senders do not block in TransmitData() RPCs, and may be 
cancelled at any
+/// time. If an in-flight RPC is cancelled at the sender side, the reply from 
the receiver
+/// will be silently dropped by the RPC layer.
+///
+/// 2. Timeouts during RPC transmission
+///
+/// Since RPCs may be arbitrarily delayed in the pending sender queue, the 
TransmitData()
+/// RPC has no RPC-level timeout. Instead, the receiver returns an error to 
the sender if
+/// a timeout occurs during the initial channel establishment phase. Since the
+/// TransmitData() RPC is asynchronous from the sender, the sender may 
continue to check
+/// for cancellation while it is waiting for a response from the receiver.
+///
+/// 3. Node or instance failure
+///
+/// If the receiver node fails, RPCs will fail fast and the sending fragment 
instance will
+/// be cancelled.
+///
+/// If a sender node fails, or the receiver node hangs, the coordinator should 
detect the
+/// failure and cancel all fragments.
+///
+/// TODO: Fix IMPALA-3990: use non-timed based approach for removing the 
closed stream
+/// receiver.
+///
+
+/// Context for a TransmitData() RPC. This structure is constructed when the 
processing of
+/// a RPC is deferred because the receiver isn't prepared or the 'batch_queue' 
is full.
+struct TransmitDataCtx {
+  /// Request data structure, memory owned by 'rpc_context'. This contains 
various info
+  /// such as the destination finst ID, plan node ID and the row batch header.
+  const TransmitDataRequestPB* request;
+
+  /// Response data structure, will be serialized back to client after 
'rpc_context' is
+  /// responded to.
+  TransmitDataResponsePB* response;
+
+  /// RpcContext owns the memory of all data structures related to the 
incoming RPC call
+  /// such as the serialized request buffer, response buffer and any sidecars. 
Must be
+  /// responded to once this RPC is finished with. RpcContext will delete 
itself once it
+  /// has been responded to. Not owned.
+  kudu::rpc::RpcContext* rpc_context;
+
+  TransmitDataCtx(const TransmitDataRequestPB* request, 
TransmitDataResponsePB* response,
+      kudu::rpc::RpcContext* rpc_context)
+    : request(request), response(response), rpc_context(rpc_context) { }
+};
+
+/// Context for an EndDataStream() RPC. This structure is constructed when the 
RPC is
+/// queued by the data stream manager for deferred processing when the 
receiver isn't
+/// prepared.
+struct EndDataStreamCtx {
+  /// Request data structure, memory owned by 'rpc_context'.
+  const EndDataStreamRequestPB* request;
+
+  /// Response data structure, will be serialized back to client after 
'rpc_context' is
+  /// responded to. Memory owned by 'rpc_context'.
+  EndDataStreamResponsePB* response;
+
+  /// Must be responded to once this RPC is finished with. RpcContext will 
delete itself
+  /// once it has been responded to. Not owned.
+  kudu::rpc::RpcContext* rpc_context;
 
+  EndDataStreamCtx(const EndDataStreamRequestPB* request,
+      EndDataStreamResponsePB* response, kudu::rpc::RpcContext* rpc_context)
+    : request(request), response(response), rpc_context(rpc_context) { }
+};
+
+/// Singleton class which manages all incoming data streams at a backend node.
+/// It provides both producer and consumer functionality for each data stream.
+///
+/// - RPC service threads use this to add incoming data to streams in response 
to
+///   TransmitData() RPCs (AddData()) or to signal end-of-stream conditions
+///   (CloseSender()).
+/// - Exchange nodes extract data from an incoming stream via a 
KrpcDataStreamRecvr,
+///   which is created with CreateRecvr().
+//
+/// DataStreamMgr also allows asynchronous cancellation of streams via Cancel()
+/// which unblocks all KrpcDataStreamRecvr::GetBatch() calls that are made on 
behalf
+/// of the cancelled fragment id.
+///
+/// Exposes three metrics:
+///  'senders-blocked-on-recvr-creation' - currently blocked senders.
+///  'total-senders-blocked-on-recvr-creation' - total number of blocked 
senders over
+///  time.
+///  'total-senders-timedout-waiting-for-recvr-creation' - total number of 
senders that
+///  timed-out while waiting for a receiver.
+///
+/// TODO: The recv buffers used in KrpcDataStreamRecvr should count against
+/// per-query memory limits.
 class KrpcDataStreamMgr : public DataStreamMgrBase {
  public:
-  [[noreturn]] KrpcDataStreamMgr(MetricGroup* metrics);
-  virtual ~KrpcDataStreamMgr() override;
+  KrpcDataStreamMgr(MetricGroup* metrics);
+
+  /// Initialize the deserialization thread pool and create the maintenance 
thread.
+  /// Return error status on failure. Return OK otherwise.
+  Status Init();
 
-  [[noreturn]] std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* 
state,
+  /// Create a receiver for a specific fragment_instance_id/dest_node_id.
+  /// If is_merging is true, the receiver maintains a separate queue of 
incoming row
+  /// batches for each sender and merges the sorted streams from each sender 
into a
+  /// single stream.
+  /// Ownership of the receiver is shared between this DataStream mgr instance 
and the
+  /// caller.
+  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
       const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
       PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
       RuntimeProfile* profile, bool is_merging) override;
 
-  [[noreturn]] Status CloseSender(const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int sender_id) override;
+  /// Handler for TransmitData() RPC.
+  ///
+  /// Adds the serialized row batch pointed to by 'request' and 'rpc_context' 
to the
+  /// receiver identified by the fragment instance id, dest node id and sender 
id
+  /// specified in 'request'. If the receiver is not yet ready, the processing 
of
+  /// 'request' is deferred until the recvr is ready, or is timed out. If the 
receiver
+  /// has already been torn-down (within the last STREAM_EXPIRATION_TIME_MS), 
the RPC
+  /// will be responded to immediately. Otherwise, the sender will be 
responded to when
+  /// time out occurs.
+  ///
+  /// 'response' is the reply to the caller and the status for deserializing 
the row batch
+  /// should be added to it; 'rpc_context' holds the payload of the incoming 
RPC calls.
+  /// It owns the memory pointed to by 'request','response' and the RPC 
sidecars. The
+  /// request together with the RPC sidecars make up the serialized row batch.
+  ///
+  /// If the stream would exceed its buffering limit as a result of queuing 
this batch,
+  /// the batch is deferred for processing later by the deserialization thread 
pool.
+  ///
+  /// The RPC may not be responded to by the time this function returns if the 
processing
+  /// is deferred.
+  ///
+  /// TODO: enforce per-sender quotas (something like 200% of 
buffer_size/#senders),
+  /// so that a single sender can't flood the buffer and stall everybody else.
+  void AddData(const TransmitDataRequestPB* request, TransmitDataResponsePB* 
response,
+      kudu::rpc::RpcContext* rpc_context);
+
+  /// Handler for EndDataStream() RPC.
+  ///
+  /// Notifies the receiver associated with the fragment/dest_node id that the 
specified
+  /// sender has closed. The RPC will be responded to if the receiver is found.
+  /// Otherwise, the request will be queued in the early senders list and 
responded
+  /// to either when the receiver is created when the request has timed out.
+  void CloseSender(const EndDataStreamRequestPB* request,
+      EndDataStreamResponsePB* response, kudu::rpc::RpcContext* context);
+
+  /// Cancels all receivers registered for fragment_instance_id immediately. 
The
+  /// receivers will not accept any row batches after being cancelled. Any 
buffered
+  /// row batches will not be freed until Close() is called on the receivers.
+  void Cancel(const TUniqueId& fragment_instance_id) override;
+
+  /// Waits for maintenance thread and sender response thread pool to finish.
+  ~KrpcDataStreamMgr();
+
+ private:
+  friend class KrpcDataStreamRecvr;
+
+  /// A task for the deserialization threads to work on. The fields identify
+  /// the target receiver's sender queue.
+  struct DeserializeTask {
+    /// The receiver's fragment instance id.
+    TUniqueId finst_id;
+
+    /// The plan node id of the exchange node owning the receiver.
+    PlanNodeId dest_node_id;
+
+    /// Sender id used for identifying the sender queue for merging exchange.
+    int sender_id;
+  };
+
+  /// Set of threads which deserialize buffered row batches, and deliver them 
to their
+  /// receivers. Used only if RPCs were deferred when their channel's batch 
queue was
+  /// full or if the receiver was not yet prepared.
+  ThreadPool<DeserializeTask> deserialize_pool_;
+
+  /// Periodically, respond to all senders that have waited for too long for 
their
+  /// receivers to show up.
+  std::unique_ptr<Thread> maintenance_thread_;
+
+  /// Used to notify maintenance_thread_ that it should exit.
+  Promise<bool> shutdown_promise_;
+
+  /// Current number of senders waiting for a receiver to register
+  IntGauge* num_senders_waiting_;
+
+  /// Total number of senders that have ever waited for a receiver to register
+  IntCounter* total_senders_waited_;
+
+  /// Total number of senders that timed-out waiting for a receiver to register
+  IntCounter* num_senders_timedout_;
+
+  /// protects all fields below
+  boost::mutex lock_;
+
+  /// Map from hash value of fragment instance id/node id pair to stream 
receivers;
+  /// Ownership of the stream revcr is shared between this instance and the 
caller of
+  /// CreateRecvr().
+  /// we don't want to create a map<pair<TUniqueId, PlanNodeId>, 
KrpcDataStreamRecvr*>,
+  /// because that requires a bunch of copying of ids for lookup
+  typedef
+      boost::unordered_multimap<uint32_t, 
std::shared_ptr<KrpcDataStreamRecvr>> RecvrMap;
+  RecvrMap receiver_map_;
+
+  /// (Fragment instance id, Plan node id) pair that uniquely identifies a 
stream.
+  typedef std::pair<impala::TUniqueId, PlanNodeId> RecvrId;
+
+  /// Less-than ordering for RecvrIds.
+  struct ComparisonOp {
+    bool operator()(const RecvrId& a, const RecvrId& b) {
+      if (a.first.hi < b.first.hi) {
+        return true;
+      } else if (a.first.hi > b.first.hi) {
+        return false;
+      } else if (a.first.lo < b.first.lo) {
+        return true;
+      } else if (a.first.lo > b.first.lo) {
+        return false;
+      }
+      return a.second < b.second;
+    }
+  };
+
+  /// An ordered set of receiver IDs so that we can easily find all receiver 
IDs belonging
+  /// to a fragment instance (by calling std::set::lower_bound(finst_id, 0) to 
find the
+  /// first entry and iterating until the entry's finst_id doesn't match).
+  ///
+  /// There is one entry in fragment_recvr_set_ for every entry in 
receiver_map_.
+  typedef std::set<RecvrId, ComparisonOp> FragmentRecvrSet;
+  FragmentRecvrSet fragment_recvr_set_;
+
+  /// List of waiting senders that need to be processed when a receiver is 
created.
+  /// Access is only thread-safe when lock_ is held.
+  struct EarlySendersList {
+    /// Queue of contexts for senders which called AddData() before the 
receiver was
+    /// set up.
+    std::vector<std::unique_ptr<TransmitDataCtx>> waiting_sender_ctxs;
+
+    /// Queue of contexts for senders that called EndDataStream() before the 
receiver was
+    /// set up.
+    std::vector<std::unique_ptr<EndDataStreamCtx>> closed_sender_ctxs;
+
+    /// Monotonic time of arrival of the first sender in ms. Used to notify 
senders when
+    /// they have waited too long.
+    int64_t arrival_time;
+
+    EarlySendersList() : arrival_time(MonotonicMillis()) { }
+
+    /// Defining the move constructor as vectors of unique_ptr are not 
copyable.
+    EarlySendersList(EarlySendersList&& other)
+      : waiting_sender_ctxs(move(other.waiting_sender_ctxs)),
+        closed_sender_ctxs(move(other.closed_sender_ctxs)),
+        arrival_time(other.arrival_time) { }
+
+    /// Defining the move operator= as vectors of unique_ptr are not copyable.
+    EarlySendersList& operator=(EarlySendersList&& other) {
+      waiting_sender_ctxs = move(other.waiting_sender_ctxs);
+      closed_sender_ctxs = move(other.closed_sender_ctxs);
+      arrival_time = other.arrival_time;
+      return *this;
+    }
+
+    DISALLOW_COPY_AND_ASSIGN(EarlySendersList);
+  };
+
+  /// Map from stream (which identifies a receiver) to a list of senders that 
should be
+  /// processed when that receiver arrives.
+  ///
+  /// Entries are removed from early_senders_map_ when either a) a receiver is 
created
+  /// or b) the Maintenance() thread detects that the longest-waiting sender 
has been
+  /// waiting for more than FLAGS_datastream_sender_timeout_ms.
+  typedef boost::unordered_map<RecvrId, EarlySendersList> EarlySendersMap;
+  EarlySendersMap early_senders_map_;
+
+  /// Map from monotonic time, in ms, that a stream should be evicted from
+  /// closed_stream_cache to its RecvrId. Used to evict old streams from cache
+  /// efficiently. Using multimap as there may be multiple streams with the 
same
+  /// eviction time.
+  typedef std::multimap<int64_t, RecvrId> ClosedStreamMap;
+  ClosedStreamMap closed_stream_expirations_;
+
+  /// Cache of recently closed RecvrIds. Used to allow straggling senders to 
fail fast by
+  /// checking this cache, rather than waiting for the missed-receiver timeout 
to elapse.
+  boost::unordered_set<RecvrId> closed_stream_cache_;
+
+  /// Adds a request of TransmitData() RPC to the early senders list. Used for 
storing
+  /// TransmitData() RPC requests which arrive before the receiver finishes 
preparing.
+  void AddEarlySender(const TUniqueId& fragment_instance_id,
+      const TransmitDataRequestPB* request, TransmitDataResponsePB* response,
+      kudu::rpc::RpcContext* context);
+
+  /// Adds a request of EndDataStream() RPC to the early senders list. Used 
for storing
+  /// EndDataStream() RPC requests which arrive before the receiver finishes 
preparing.
+  void AddEarlyClosedSender(const TUniqueId& fragment_instance_id,
+      const EndDataStreamRequestPB* request, EndDataStreamResponsePB* response,
+      kudu::rpc::RpcContext* context);
+
+  /// Enqueue 'num_requests' requests to the deserialization thread pool to 
drain the
+  /// deferred RPCs for the receiver with fragment instance id of 'finst_id', 
plan node
+  /// id of 'dest_node_id'. 'sender_id' identifies the sender queue if the 
receiver
+  /// belongs to a merging exchange node. This may block so no lock should be 
held when
+  /// calling this function.
+  void EnqueueDeserializeTask(const TUniqueId& finst_id, PlanNodeId 
dest_node_id,
+      int sender_id, int num_requests);
+
+  /// Worker function for deserializing a deferred RPC request stored in task.
+  /// Called from the deserialization thread.
+  void DeserializeThreadFn(int thread_id, const DeserializeTask& task);
+
+  /// Return a shared_ptr to the receiver for given 
fragment_instance_id/dest_node_id, or
+  /// an empty shared_ptr if not found. Must be called with lock_ already 
held. If the
+  /// stream was recently closed, sets *already_unregistered to true to 
indicate to caller
+  /// that stream will not be available in the future. In that case, the 
returned
+  /// shared_ptr will be empty.
+  std::shared_ptr<KrpcDataStreamRecvr> FindRecvr(const TUniqueId& 
fragment_instance_id,
+      PlanNodeId dest_node_id, bool* already_unregistered);
+
+  /// Remove receiver for fragment_instance_id/dest_node_id from the map. Will 
also
+  /// cancel all the sender queues of the receiver.
+  Status DeregisterRecvr(const TUniqueId& fragment_instance_id, PlanNodeId 
dest_node_id);
+
+  /// Returned a hash value generated from the fragment instance id and dest 
node id.
+  /// The hash value is the key in the 'receiver_map_' for the receiver of
+  /// fragment_instance_id/dest_node_id.
+  uint32_t GetHashValue(const TUniqueId& fragment_instance_id, PlanNodeId 
dest_node_id);
 
-  [[noreturn]] void Cancel(const TUniqueId& fragment_instance_id) override;
+  /// Responds to a sender when a RPC request has timed out waiting for the 
receiver to
+  /// show up. 'ctx' is the encapsulated RPC request context (e.g. 
TransmitDataCtx).
+  template<typename ContextType, typename RequestPBType>
+  void RespondToTimedOutSender(const std::unique_ptr<ContextType>& ctx);
 
+  /// Notifies any sender that has been waiting for its receiver for more than
+  /// FLAGS_datastream_sender_timeout_ms.
+  ///
+  /// Run by maintenance_thread_.
+  void Maintenance();
 };
 
 } // namespace impala
-#endif /* IMPALA_RUNTIME_KRPC_DATA_STREAM_MGR_H */
+#endif // IMPALA_RUNTIME_KRPC_DATA_STREAM_MGR_H

Reply via email to