Repository: incubator-impala
Updated Branches:
  refs/heads/master d40ada01b -> d3a6b0bf2


IMPALA-5744: Add 'use_krpc' flag and create DataStream interface

This patch introduces a 'use_krpc' flag and creates an abstract
interface for the DataStreamRecvr/Mgr.

The 'use_krpc' flag defaults to 'false'. Cluster startup will abort
with an error if the flag is switched to 'true'.

The DataStreamSender implements the same virtual interface as the
DataSink, so a pure virtual class for the DataStreamSender would
essentially be an empty class. Therefore, it is not implemented.

The new interfaces are pure virtual base classes and are named
DataStream*Base. Using pure virtual base classes may result in code
duplication but the current approach is to optimize for the eventual
removal of the thrift implementations.

Stubs for the Krpc implementations are also introduced and are named
KrpcDataStream*. They currently only abort with a fatal error if they
are used. Their actual implementations will be filled in a later
patch.

When the time comes to standardize on the KRPC implementations of
DataStream*Base, we will get rid of the DataStream*Base classes and
the Thrift versions of the classes and rename KrpcDataStream* to
DataStream*. We will also rename all the references that the clients
have to DataStream*Base to DataStream*.

Also did some spurious includes cleanup.

Change-Id: I5d52245154e910529a68f53049520238eca16241
Reviewed-on: http://gerrit.cloudera.org:8080/7542
Reviewed-by: Sailesh Mukil <sail...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d3a6b0bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d3a6b0bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d3a6b0bf

Branch: refs/heads/master
Commit: d3a6b0bf2da1fba883b10c570ce98aff61d1b034
Parents: d40ada0
Author: Sailesh Mukil <sail...@cloudera.com>
Authored: Mon Jul 31 15:34:33 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Wed Aug 9 22:45:51 2017 +0000

----------------------------------------------------------------------
 be/src/exec/data-sink.cc                  |  4 ++
 be/src/exec/exchange-node.cc              |  3 ++
 be/src/exec/exchange-node.h               | 16 +++----
 be/src/runtime/CMakeLists.txt             |  4 +-
 be/src/runtime/data-stream-mgr-base.h     | 62 ++++++++++++++++++++++++++
 be/src/runtime/data-stream-mgr.cc         |  5 ++-
 be/src/runtime/data-stream-mgr.h          | 12 ++---
 be/src/runtime/data-stream-recvr-base.h   | 60 +++++++++++++++++++++++++
 be/src/runtime/data-stream-recvr.h        | 25 ++++++-----
 be/src/runtime/data-stream-test.cc        | 16 ++++---
 be/src/runtime/exec-env.cc                | 23 +++++++++-
 be/src/runtime/exec-env.h                 | 13 +++++-
 be/src/runtime/fragment-instance-state.cc |  2 +-
 be/src/runtime/krpc-data-stream-mgr.cc    | 58 ++++++++++++++++++++++++
 be/src/runtime/krpc-data-stream-mgr.h     | 52 +++++++++++++++++++++
 be/src/runtime/krpc-data-stream-recvr.cc  | 62 ++++++++++++++++++++++++++
 be/src/runtime/krpc-data-stream-recvr.h   | 47 +++++++++++++++++++
 be/src/runtime/runtime-state.cc           |  4 +-
 be/src/runtime/runtime-state.h            |  4 +-
 be/src/service/impala-server.cc           |  3 +-
 20 files changed, 436 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index 84015df..fe23694 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -37,6 +37,8 @@
 
 #include "common/names.h"
 
+DECLARE_bool(use_krpc);
+
 using strings::Substitute;
 
 namespace impala {
@@ -58,6 +60,8 @@ Status DataSink::Create(const TPlanFragmentCtx& fragment_ctx,
     case TDataSinkType::DATA_STREAM_SINK:
       if (!thrift_sink.__isset.stream_sink) return Status("Missing data stream 
sink.");
 
+      // TODO: Remove DCHECK when KRPC is supported.
+      DCHECK(!FLAGS_use_krpc);
       // TODO: figure out good buffer size based on size of output row
       *sink = pool->Add(new DataStreamSender(fragment_instance_ctx.sender_id, 
row_desc,
           thrift_sink.stream_sink, fragment_ctx.destinations, 16 * 1024));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 9d1383d..9b5f548 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -19,6 +19,7 @@
 
 #include <boost/scoped_ptr.hpp>
 
+#include "exprs/scalar-expr.h"
 #include "runtime/data-stream-mgr.h"
 #include "runtime/data-stream-recvr.h"
 #include "runtime/runtime-state.h"
@@ -27,6 +28,8 @@
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
 #include "util/time.h"
+#include "util/tuple-row-compare.h"
+
 #include "gen-cpp/PlanNodes_types.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/exec/exchange-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.h b/be/src/exec/exchange-node.h
index 4781213..4bb6ce3 100644
--- a/be/src/exec/exchange-node.h
+++ b/be/src/exec/exchange-node.h
@@ -24,7 +24,7 @@
 
 namespace impala {
 
-class DataStreamRecvr;
+class DataStreamRecvrBase;
 class RowBatch;
 class ScalarExpr;
 class TupleRowComparator;
@@ -34,12 +34,12 @@ class TupleRowComparator;
 /// is_merging is set to indicate that rows from different senders must be 
merged
 /// according to the sort parameters in sort_exec_exprs_. (It is assumed that 
the rows
 /// received from the senders themselves are sorted.)
-/// If is_merging_ is true, the exchange node creates a DataStreamRecvr with 
the
+/// If is_merging_ is true, the exchange node creates a DataStreamRecvrBase 
with the
 /// is_merging_ flag and retrieves retrieves rows from the receiver via calls 
to
-/// DataStreamRecvr::GetNext(). It also prepares, opens and closes the 
ordering exprs in
-/// its SortExecExprs member that are used to compare rows.
+/// DataStreamRecvrBase::GetNext(). It also prepares, opens and closes the 
ordering exprs
+/// in its SortExecExprs member that are used to compare rows.
 /// If is_merging_ is false, the exchange node directly retrieves batches from 
the row
-/// batch queue of the DataStreamRecvr via calls to 
DataStreamRecvr::GetBatch().
+/// batch queue of the DataStreamRecvrBase via calls to 
DataStreamRecvrBase::GetBatch().
 class ExchangeNode : public ExecNode {
  public:
   ExchangeNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& 
descs);
@@ -63,7 +63,7 @@ class ExchangeNode : public ExecNode {
 
  private:
   /// Implements GetNext() for the case where is_merging_ is true. Delegates 
the GetNext()
-  /// call to the underlying DataStreamRecvr.
+  /// call to the underlying DataStreamRecvrBase.
   Status GetNextMerging(RuntimeState* state, RowBatch* output_batch, bool* 
eos);
 
   /// Resets input_batch_ to the next batch from the from stream_recvr_'s 
queue.
@@ -72,10 +72,10 @@ class ExchangeNode : public ExecNode {
 
   int num_senders_;  // needed for stream_recvr_ construction
 
-  /// The underlying DataStreamRecvr instance. Ownership is shared between this
+  /// The underlying DataStreamRecvrBase instance. Ownership is shared between 
this
   /// exchange node instance and the DataStreamMgr used to create the receiver.
   /// stream_recvr_->Close() must be called before this instance is destroyed.
-  std::shared_ptr<DataStreamRecvr> stream_recvr_;
+  std::shared_ptr<DataStreamRecvrBase> stream_recvr_;
 
   /// our input rows are a prefix of the rows we produce
   RowDescriptor input_row_desc_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 391fd01..c9aacc9 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -29,8 +29,8 @@ add_library(Runtime
   coordinator.cc
   coordinator-backend-state.cc
   data-stream-mgr.cc
-  data-stream-sender.cc
   data-stream-recvr.cc
+  data-stream-sender.cc
   debug-options.cc
   descriptors.cc
   disk-io-mgr.cc
@@ -44,6 +44,8 @@ add_library(Runtime
   hbase-table-factory.cc
   hdfs-fs-cache.cc
   initial-reservations.cc
+  krpc-data-stream-mgr.cc
+  krpc-data-stream-recvr.cc
   lib-cache.cc
   mem-tracker.cc
   mem-pool.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/data-stream-mgr-base.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr-base.h 
b/be/src/runtime/data-stream-mgr-base.h
new file mode 100644
index 0000000..53798b1
--- /dev/null
+++ b/be/src/runtime/data-stream-mgr-base.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_DATA_STREAM_MGR_BASE_H
+#define IMPALA_RUNTIME_DATA_STREAM_MGR_BASE_H
+
+#include "common/status.h"
+#include "runtime/descriptors.h"  // for PlanNodeId
+
+namespace impala {
+
+class DataStreamRecvrBase;
+class RuntimeProfile;
+class RuntimeState;
+class TRowBatch;
+class TUniqueId;
+
+/// Interface for a singleton class which manages all incoming data streams at 
a backend
+/// node.
+/// TODO: This is a temporary pure virtual base class that defines the basic 
interface for
+/// 2 parallel implementations of the DataStreamMgrBase, one each for Thrift 
and KRPC.
+/// Remove this in favor of the KRPC implementation when possible.
+class DataStreamMgrBase {
+ public:
+  DataStreamMgrBase() {}
+
+  virtual ~DataStreamMgrBase() { }
+
+  /// Create a receiver for a specific fragment_instance_id/node_id 
destination;
+  virtual std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
+      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
+      PlanNodeId dest_node_id, int num_senders, int buffer_size, 
RuntimeProfile* profile,
+      bool is_merging) = 0;
+
+  /// Notifies the recvr associated with the fragment/node id that the 
specified
+  /// sender has closed.
+  virtual Status CloseSender(const TUniqueId& fragment_instance_id,
+      PlanNodeId dest_node_id, int sender_id) = 0;
+
+  /// Closes all receivers registered for fragment_instance_id immediately.
+  virtual void Cancel(const TUniqueId& fragment_instance_id) = 0;
+
+};
+
+}
+
+#endif /* IMPALA_RUNTIME_DATA_STREAM_MGR_BASE_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc 
b/be/src/runtime/data-stream-mgr.cc
index f36f4c9..82bc01f 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -64,6 +64,9 @@ DataStreamMgr::DataStreamMgr(MetricGroup* metrics) {
       "total-senders-timedout-waiting-for-recvr-creation", 0L);
 }
 
+DataStreamMgr::~DataStreamMgr() {
+}
+
 inline uint32_t DataStreamMgr::GetHashValue(
     const TUniqueId& fragment_instance_id, PlanNodeId node_id) {
   uint32_t value = RawValue::GetHashValue(&fragment_instance_id.lo, 
TYPE_BIGINT, 0);
@@ -72,7 +75,7 @@ inline uint32_t DataStreamMgr::GetHashValue(
   return value;
 }
 
-shared_ptr<DataStreamRecvr> DataStreamMgr::CreateRecvr(RuntimeState* state,
+shared_ptr<DataStreamRecvrBase> DataStreamMgr::CreateRecvr(RuntimeState* state,
     const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
     PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* 
profile,
     bool is_merging) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
index 7819c24..5dae908 100644
--- a/be/src/runtime/data-stream-mgr.h
+++ b/be/src/runtime/data-stream-mgr.h
@@ -28,6 +28,7 @@
 
 #include "common/status.h"
 #include "common/object-pool.h"
+#include "runtime/data-stream-mgr-base.h"
 #include "runtime/descriptors.h"  // for PlanNodeId
 #include "util/metrics.h"
 #include "util/promise.h"
@@ -63,9 +64,10 @@ class TRowBatch;
 ///
 /// TODO: The recv buffers used in DataStreamRecvr should count against
 /// per-query memory limits.
-class DataStreamMgr {
+class DataStreamMgr : public DataStreamMgrBase {
  public:
   DataStreamMgr(MetricGroup* metrics);
+  virtual ~DataStreamMgr() override;
 
   /// Create a receiver for a specific fragment_instance_id/node_id 
destination;
   /// If is_merging is true, the receiver maintains a separate queue of 
incoming row
@@ -73,10 +75,10 @@ class DataStreamMgr {
   /// single stream.
   /// Ownership of the receiver is shared between this DataStream mgr instance 
and the
   /// caller.
-  std::shared_ptr<DataStreamRecvr> CreateRecvr(RuntimeState* state,
+  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
       const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
       PlanNodeId dest_node_id, int num_senders, int buffer_size, 
RuntimeProfile* profile,
-      bool is_merging);
+      bool is_merging) override;
 
   /// Adds a row batch to the recvr identified by 
fragment_instance_id/dest_node_id
   /// if the recvr has not been cancelled. sender_id identifies the sender 
instance
@@ -94,10 +96,10 @@ class DataStreamMgr {
   /// sender has closed.
   /// Returns OK if successful, error status otherwise.
   Status CloseSender(const TUniqueId& fragment_instance_id, PlanNodeId 
dest_node_id,
-      int sender_id);
+      int sender_id) override;
 
   /// Closes all receivers registered for fragment_instance_id immediately.
-  void Cancel(const TUniqueId& fragment_instance_id);
+  void Cancel(const TUniqueId& fragment_instance_id) override;
 
  private:
   friend class DataStreamRecvr;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/data-stream-recvr-base.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr-base.h 
b/be/src/runtime/data-stream-recvr-base.h
new file mode 100644
index 0000000..e0d06fe
--- /dev/null
+++ b/be/src/runtime/data-stream-recvr-base.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_DATA_STREAM_RECVR_BASE_H
+#define IMPALA_RUNTIME_DATA_STREAM_RECVR_BASE_H
+
+#include "common/status.h"
+
+namespace impala {
+
+class RowBatch;
+class TupleRowComparator;
+
+/// Interface for a single receiver of a m:n data stream.
+/// DataStreamRecvrBase implementations should maintain one or more queues of 
row batches
+/// received by a DataStreamMgrBase implementation from one or more sender 
fragment
+/// instances.
+/// TODO: This is a temporary pure virtual base class that defines the basic 
interface for
+/// 2 parallel implementations of the DataStreamRecvrBase, one each for Thrift 
and KRPC.
+/// Remove this in favor of the KRPC implementation when possible.
+class DataStreamRecvrBase {
+ public:
+  DataStreamRecvrBase() { }
+  virtual ~DataStreamRecvrBase() { }
+
+  /// Returns next row batch in data stream.
+  virtual Status GetBatch(RowBatch** next_batch) = 0;
+
+  virtual void Close() = 0;
+
+  /// Create a SortedRunMerger instance to merge rows from multiple senders 
according to
+  /// the specified row comparator.
+  virtual Status CreateMerger(const TupleRowComparator& less_than) = 0;
+
+  /// Fill output_batch with the next batch of rows.
+  virtual Status GetNext(RowBatch* output_batch, bool* eos) = 0;
+
+  /// Transfer all resources from the current batches being processed from 
each sender
+  /// queue to the specified batch.
+  virtual void TransferAllResources(RowBatch* transfer_batch) = 0;
+
+};
+
+}
+
+#endif /* IMPALA_RUNTIME_DATA_STREAM_RECVR_BASE_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.h 
b/be/src/runtime/data-stream-recvr.h
index 96adfc4..468f58a 100644
--- a/be/src/runtime/data-stream-recvr.h
+++ b/be/src/runtime/data-stream-recvr.h
@@ -18,15 +18,17 @@
 #ifndef IMPALA_RUNTIME_DATA_STREAM_RECVR_H
 #define IMPALA_RUNTIME_DATA_STREAM_RECVR_H
 
+#include "data-stream-recvr-base.h"
+
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/mutex.hpp>
 
+#include "common/atomic.h"
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "gen-cpp/Types_types.h"   // for TUniqueId
-#include "gen-cpp/Results_types.h" // for TRowBatch
 #include "runtime/descriptors.h"
-#include "util/tuple-row-compare.h"
+#include "util/runtime-profile.h"
 
 namespace impala {
 
@@ -34,9 +36,12 @@ class DataStreamMgr;
 class SortedRunMerger;
 class MemTracker;
 class RowBatch;
-class RuntimeProfile;
+class TRowBatch;
+class TupleRowCompare;
 
 /// Single receiver of an m:n data stream.
+/// This is for use by the DataStreamMgr, which is the implementation of the 
abstract
+/// class DataStreamMgrBase that depends on Thrift.
 /// DataStreamRecvr maintains one or more queues of row batches received by a
 /// DataStreamMgr from one or more sender fragment instances.
 /// Receivers are created via DataStreamMgr::CreateRecvr().
@@ -60,9 +65,9 @@ class RuntimeProfile;
 //
 /// DataStreamRecvr::Close() must be called by the caller of CreateRecvr() to 
remove the
 /// recvr instance from the tracking structure of its DataStreamMgr in all 
cases.
-class DataStreamRecvr {
+class DataStreamRecvr : public DataStreamRecvrBase {
  public:
-  ~DataStreamRecvr();
+  virtual ~DataStreamRecvr() override;
 
   /// Returns next row batch in data stream; blocks if there aren't any.
   /// Retains ownership of the returned batch. The caller must acquire data 
from the
@@ -70,23 +75,23 @@ class DataStreamRecvr {
   /// eos. Must only be called if is_merging_ is false.
   /// TODO: This is currently only exposed to the non-merging version of the 
exchange.
   /// Refactor so both merging and non-merging exchange use GetNext(RowBatch*, 
bool* eos).
-  Status GetBatch(RowBatch** next_batch);
+  Status GetBatch(RowBatch** next_batch) override;
 
   /// Deregister from DataStreamMgr instance, which shares ownership of this 
instance.
-  void Close();
+  void Close() override;
 
   /// Create a SortedRunMerger instance to merge rows from multiple sender 
according to the
   /// specified row comparator. Fetches the first batches from the individual 
sender
   /// queues. The exprs used in less_than must have already been prepared and 
opened.
-  Status CreateMerger(const TupleRowComparator& less_than);
+  Status CreateMerger(const TupleRowComparator& less_than) override;
 
   /// Fill output_batch with the next batch of rows obtained by merging the 
per-sender
   /// input streams. Must only be called if is_merging_ is true.
-  Status GetNext(RowBatch* output_batch, bool* eos);
+  Status GetNext(RowBatch* output_batch, bool* eos) override;
 
   /// Transfer all resources from the current batches being processed from 
each sender
   /// queue to the specified batch.
-  void TransferAllResources(RowBatch* transfer_batch);
+  void TransferAllResources(RowBatch* transfer_batch) override;
 
   const TUniqueId& fragment_instance_id() const { return 
fragment_instance_id_; }
   PlanNodeId dest_node_id() const { return dest_node_id_; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc 
b/be/src/runtime/data-stream-test.cc
index 990d8bb..93862fd 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -27,9 +27,11 @@
 #include "rpc/thrift-server.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "runtime/data-stream-mgr-base.h"
 #include "runtime/data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/data-stream-sender.h"
+#include "runtime/data-stream-recvr-base.h"
 #include "runtime/data-stream-recvr.h"
 #include "runtime/descriptors.h"
 #include "runtime/client-cache.h"
@@ -214,7 +216,7 @@ class DataStreamTest : public testing::Test {
   int64_t* tuple_mem_;
 
   // receiving node
-  DataStreamMgr* stream_mgr_;
+  DataStreamMgrBase* stream_mgr_;
   ThriftServer* server_;
 
   // sending node(s)
@@ -238,7 +240,7 @@ class DataStreamTest : public testing::Test {
     int receiver_num;
 
     thread* thread_handle;
-    shared_ptr<DataStreamRecvr> stream_recvr;
+    shared_ptr<DataStreamRecvrBase> stream_recvr;
     Status status;
     int num_rows_received;
     multiset<int64_t> data_values;
@@ -452,7 +454,10 @@ class DataStreamTest : public testing::Test {
 
   // Start backend in separate thread.
   void StartBackend() {
-    boost::shared_ptr<ImpalaTestBackend> handler(new 
ImpalaTestBackend(stream_mgr_));
+    // Dynamic cast stream_mgr_ which is of type DataStreamMgrBase to derived 
type
+    // DataStreamMgr, since ImpalaTestBackend() accepts only DataStreamMgr*.
+    boost::shared_ptr<ImpalaTestBackend> handler(
+        new ImpalaTestBackend(dynamic_cast<DataStreamMgr*>(stream_mgr_)));
     boost::shared_ptr<TProcessor> processor(new 
ImpalaInternalServiceProcessor(handler));
     ThriftServerBuilder("DataStreamTest backend", processor, 
FLAGS_port).Build(&server_);
     server_->Start();
@@ -606,8 +611,9 @@ TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
   // Start just one receiver.
   TUniqueId instance_id;
   GetNextInstanceId(&instance_id);
-  shared_ptr<DataStreamRecvr> stream_recvr = 
stream_mgr_->CreateRecvr(runtime_state.get(),
-      row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile.get(), false);
+  shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr(
+      runtime_state.get(), row_desc_, instance_id, DEST_NODE_ID, 1, 1, 
profile.get(),
+      false);
 
   // Perform tear down, but keep a reference to the receiver so that it is 
deleted last
   // (to confirm that the destructor does not access invalid state after 
tear-down).

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 3427c30..4551232 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -37,6 +37,7 @@
 #include "runtime/disk-io-mgr.h"
 #include "runtime/hbase-table-factory.h"
 #include "runtime/hdfs-fs-cache.h"
+#include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/lib-cache.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-exec-mgr.h"
@@ -75,6 +76,9 @@ DEFINE_int32(state_store_subscriber_port, 23000,
 DEFINE_int32(num_hdfs_worker_threads, 16,
     "(Advanced) The number of threads in the global HDFS operation pool");
 DEFINE_bool(disable_admission_control, false, "Disables admission control.");
+DEFINE_bool_hidden(use_krpc, false, "Used to indicate whether to use Kudu RPC 
for the "
+    "DataStream subsystem, or the Thrift RPC layer instead. Defaults to false. 
"
+    "KRPC not yet supported");
 
 DECLARE_int32(state_store_port);
 DECLARE_int32(num_threads_per_core);
@@ -138,7 +142,6 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, 
int subscriber_port,
     int webserver_port, const string& statestore_host, int statestore_port)
   : obj_pool_(new ObjectPool),
     metrics_(new MetricGroup("impala-metrics")),
-    stream_mgr_(new DataStreamMgr(metrics_.get())),
     impalad_client_cache_(
         new 
ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, 0,
             FLAGS_backend_client_rpc_timeout_ms, 
FLAGS_backend_client_rpc_timeout_ms, "",
@@ -162,6 +165,13 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, 
int subscriber_port,
     query_exec_mgr_(new QueryExecMgr()),
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
     backend_address_(MakeNetworkAddress(hostname, backend_port)) {
+
+  if (FLAGS_use_krpc) {
+    stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
+  } else {
+    stream_mgr_.reset(new DataStreamMgr(metrics_.get()));
+  }
+
   request_pool_service_.reset(new RequestPoolService(metrics_.get()));
 
   TNetworkAddress subscriber_address = MakeNetworkAddress(hostname, 
subscriber_port);
@@ -357,4 +367,15 @@ Status ExecEnv::GetKuduClient(
   }
   return Status::OK();
 }
+
+DataStreamMgr* ExecEnv::ThriftStreamMgr() {
+  DCHECK(!FLAGS_use_krpc);
+  return dynamic_cast<DataStreamMgr*>(stream_mgr_.get());
+}
+
+KrpcDataStreamMgr* ExecEnv::KrpcStreamMgr() {
+  DCHECK(FLAGS_use_krpc);
+  return dynamic_cast<KrpcDataStreamMgr*>(stream_mgr_.get());
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 63d2e0b..6293988 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -36,6 +36,7 @@ namespace impala {
 class AdmissionController;
 class BufferPool;
 class CallableThreadPool;
+class DataStreamMgrBase;
 class DataStreamMgr;
 class DiskIoMgr;
 class QueryExecMgr;
@@ -43,6 +44,7 @@ class Frontend;
 class HBaseTableFactory;
 class HdfsFsCache;
 class ImpalaServer;
+class KrpcDataStreamMgr;
 class LibCache;
 class MemTracker;
 class MetricGroup;
@@ -84,7 +86,14 @@ class ExecEnv {
   /// TODO: Should ExecEnv own the ImpalaServer as well?
   void SetImpalaServer(ImpalaServer* server) { impala_server_ = server; }
 
-  DataStreamMgr* stream_mgr() { return stream_mgr_.get(); }
+  DataStreamMgrBase* stream_mgr() { return stream_mgr_.get(); }
+
+  /// TODO: Remove once a single DataStreamMgrBase implementation is 
standardized on.
+  /// Clients of DataStreamMgrBase should use stream_mgr() unless they need to 
access
+  /// members that are not a part of the DataStreamMgrBase interface.
+  DataStreamMgr* ThriftStreamMgr();
+  KrpcDataStreamMgr* KrpcStreamMgr();
+
   ImpalaBackendClientCache* impalad_client_cache() {
     return impalad_client_cache_.get();
   }
@@ -138,7 +147,7 @@ class ExecEnv {
  private:
   boost::scoped_ptr<ObjectPool> obj_pool_;
   boost::scoped_ptr<MetricGroup> metrics_;
-  boost::scoped_ptr<DataStreamMgr> stream_mgr_;
+  boost::scoped_ptr<DataStreamMgrBase> stream_mgr_;
   boost::scoped_ptr<Scheduler> scheduler_;
   boost::scoped_ptr<AdmissionController> admission_controller_;
   boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc 
b/be/src/runtime/fragment-instance-state.cc
index 07b3f1c..99de62d 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -36,10 +36,10 @@
 #include "runtime/backend-client.h"
 #include "runtime/runtime-filter-bank.h"
 #include "runtime/client-cache.h"
+#include "runtime/data-stream-mgr.h"
 #include "runtime/runtime-state.h"
 #include "runtime/query-state.h"
 #include "runtime/query-state.h"
-#include "runtime/data-stream-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "scheduling/query-schedule.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc 
b/be/src/runtime/krpc-data-stream-mgr.cc
new file mode 100644
index 0000000..0515897
--- /dev/null
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/krpc-data-stream-mgr.h"
+
+#include "common/logging.h"
+
+DECLARE_bool(use_krpc);
+
+namespace impala {
+
+[[noreturn]] static void AbortUnsupportedFeature() {
+  // We should have gotten here only if the FLAGS_use_krpc is set to true.
+  CHECK(FLAGS_use_krpc) << "Shouldn't reach here unless startup flag 
'use_krpc' "
+      "is true.";
+  // KRPC isn't supported yet, so abort.
+  ABORT_WITH_ERROR("KRPC is not supported yet. Please set the 'use_krpc' flag 
to "
+      "false and restart the cluster.");
+}
+
+[[noreturn]] KrpcDataStreamMgr::KrpcDataStreamMgr(MetricGroup* metrics) {
+  AbortUnsupportedFeature();
+}
+
+KrpcDataStreamMgr::~KrpcDataStreamMgr() {
+}
+
+[[noreturn]] std::shared_ptr<DataStreamRecvrBase> 
KrpcDataStreamMgr::CreateRecvr(RuntimeState* state,
+    const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
+    PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* 
profile,
+    bool is_merging) {
+  AbortUnsupportedFeature();
+}
+
+[[noreturn]] Status KrpcDataStreamMgr::CloseSender(const TUniqueId& 
fragment_instance_id,
+    PlanNodeId dest_node_id, int sender_id) {
+  AbortUnsupportedFeature();
+}
+
+[[noreturn]] void KrpcDataStreamMgr::Cancel(const TUniqueId& 
fragment_instance_id) {
+  AbortUnsupportedFeature();
+}
+
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h 
b/be/src/runtime/krpc-data-stream-mgr.h
new file mode 100644
index 0000000..adaebda
--- /dev/null
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_KRPC_DATA_STREAM_MGR_H
+#define IMPALA_RUNTIME_KRPC_DATA_STREAM_MGR_H
+
+#include "runtime/data-stream-mgr-base.h"
+
+#include "common/status.h"
+#include "runtime/descriptors.h"  // for PlanNodeId
+
+namespace impala {
+
+class DataStreamRecvrBase;
+class MetricGroup;
+class RuntimeProfile;
+class RuntimeState;
+class TRowBatch;
+
+class KrpcDataStreamMgr : public DataStreamMgrBase {
+ public:
+  [[noreturn]] KrpcDataStreamMgr(MetricGroup* metrics);
+  virtual ~KrpcDataStreamMgr() override;
+
+  [[noreturn]] std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* 
state,
+      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
+      PlanNodeId dest_node_id, int num_senders, int buffer_size, 
RuntimeProfile* profile,
+      bool is_merging) override;
+
+  [[noreturn]] Status CloseSender(const TUniqueId& fragment_instance_id,
+      PlanNodeId dest_node_id, int sender_id) override;
+
+  [[noreturn]] void Cancel(const TUniqueId& fragment_instance_id) override;
+
+};
+
+} // namespace impala
+#endif /* IMPALA_RUNTIME_KRPC_DATA_STREAM_MGR_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc 
b/be/src/runtime/krpc-data-stream-recvr.cc
new file mode 100644
index 0000000..0731d45
--- /dev/null
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/krpc-data-stream-recvr.h"
+
+#include "common/logging.h"
+
+DECLARE_bool(use_krpc);
+
+namespace impala {
+
+[[noreturn]] static void AbortUnsupportedFeature() {
+  // We should have gotten here only if the FLAGS_use_krpc is set to true.
+  CHECK(FLAGS_use_krpc) << "Shouldn't reach here unless startup flag 
'use_krpc' "
+      "is true.";
+  // KRPC isn't supported yet, so abort.
+  ABORT_WITH_ERROR("KRPC is not supported yet. Please set the 'use_krpc' flag 
to "
+      "false and restart the cluster.");
+}
+
+[[noreturn]] KrpcDataStreamRecvr::KrpcDataStreamRecvr() {
+  AbortUnsupportedFeature();
+}
+
+KrpcDataStreamRecvr::~KrpcDataStreamRecvr() {
+}
+
+[[noreturn]] Status KrpcDataStreamRecvr::GetBatch(RowBatch** next_batch) {
+  AbortUnsupportedFeature();
+}
+
+[[noreturn]] void KrpcDataStreamRecvr::Close() {
+  AbortUnsupportedFeature();
+}
+
+[[noreturn]] Status KrpcDataStreamRecvr::CreateMerger(const 
TupleRowComparator& less_than) {
+  AbortUnsupportedFeature();
+}
+
+[[noreturn]] Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* 
eos) {
+  AbortUnsupportedFeature();
+}
+
+[[noreturn]] void KrpcDataStreamRecvr::TransferAllResources(RowBatch* 
transfer_batch) {
+  AbortUnsupportedFeature();
+}
+
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/krpc-data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.h 
b/be/src/runtime/krpc-data-stream-recvr.h
new file mode 100644
index 0000000..be46ae3
--- /dev/null
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H
+#define IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H
+
+#include "data-stream-recvr-base.h"
+
+#include "common/status.h"
+
+namespace impala {
+
+class RowBatch;
+class TupleRowComparator;
+
+/// TODO: Stub for the KRPC version of the DataStreamRecvr. Fill with actual
+/// implementation.
+class KrpcDataStreamRecvr : public DataStreamRecvrBase {
+ public:
+  [[noreturn]] KrpcDataStreamRecvr();
+  virtual ~KrpcDataStreamRecvr() override;
+
+  [[noreturn]] Status GetBatch(RowBatch** next_batch) override;
+  [[noreturn]] void Close() override;
+  [[noreturn]] Status CreateMerger(const TupleRowComparator& less_than) 
override;
+  [[noreturn]] Status GetNext(RowBatch* output_batch, bool* eos) override;
+  [[noreturn]] void TransferAllResources(RowBatch* transfer_batch) override;
+
+};
+
+} // namespace impala
+
+#endif /* IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index b986e8d..b10a9e5 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -33,7 +33,7 @@
 #include "exprs/scalar-fn-call.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
-#include "runtime/data-stream-mgr.h"
+#include "runtime/data-stream-mgr-base.h"
 #include "runtime/data-stream-recvr.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec-env.h"
@@ -270,7 +270,7 @@ DiskIoMgr* RuntimeState::io_mgr() {
   return exec_env_->disk_io_mgr();
 }
 
-DataStreamMgr* RuntimeState::stream_mgr() {
+DataStreamMgrBase* RuntimeState::stream_mgr() {
   return exec_env_->stream_mgr();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 665de7b..49ea9a6 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -48,7 +48,7 @@ class Status;
 class TimestampValue;
 class TUniqueId;
 class ExecEnv;
-class DataStreamMgr;
+class DataStreamMgrBase;
 class HBaseTableFactory;
 class TPlanFragmentCtx;
 class TPlanFragmentInstanceCtx;
@@ -121,7 +121,7 @@ class RuntimeState {
         : no_instance_id_;
   }
   ExecEnv* exec_env() { return exec_env_; }
-  DataStreamMgr* stream_mgr();
+  DataStreamMgrBase* stream_mgr();
   HBaseTableFactory* htable_factory();
   ImpalaBackendClientCache* impalad_client_cache();
   CatalogServiceClientCache* catalogd_client_cache();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d3a6b0bf/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 92549cf..b476a06 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1164,7 +1164,7 @@ void ImpalaServer::TransmitData(
   // TODO: fix Thrift so we can simply take ownership of thrift_batch instead
   // of having to copy its data
   if (params.row_batch.num_rows > 0) {
-    Status status = exec_env_->stream_mgr()->AddData(
+    Status status = exec_env_->ThriftStreamMgr()->AddData(
         params.dest_fragment_instance_id, params.dest_node_id, 
params.row_batch,
         params.sender_id);
     status.SetTStatus(&return_val);
@@ -1923,6 +1923,7 @@ Status CreateImpalaServer(ExecEnv* exec_env, int 
beeswax_port, int hs2_port, int
   DCHECK((beeswax_port == 0) == (beeswax_server == nullptr));
   DCHECK((hs2_port == 0) == (hs2_server == nullptr));
   DCHECK((be_port == 0) == (be_server == nullptr));
+
   if (!FLAGS_is_coordinator && !FLAGS_is_executor) {
     return Status("Impala does not have a valid role configured. "
         "Either --is_coordinator or --is_executor must be set to true.");


Reply via email to