Repository: incubator-impala
Updated Branches:
  refs/heads/master 7191e1b28 -> eb54287fb


IMPALA-5167: Reduce the number of Kudu clients created (BE)

Creating Kudu clients is very expensive as each will fetch
metadata from the Kudu master, so we should minimize the
number of Kudu clients that get created.

This patch stores a map from Kudu master addressed to Kudu
clients in the ExecEnv to be used across the BE for all
queries. Another patch will address the FE.

This relies on a change on the Kudu side that clears
non-covered range entries from the client's cache on
table open (fdc022fe6231af20e307012d98c35b16cbfa7b33)

Testing:
- Ran a stress test on a 10 node cluster: scan of a small
  Kudu table, 1000 concurrent queries, load on the Kudu
  master was reduced signficantly, from ~50% cpu to ~5%.
  (with the FE changes included)
- Ran the Kudu e2e tests.
- Manually ran a test with concurrent INSERTs and
  'ALTER TABLE ADD PARTITION' (which is affected by the
  Kudu side change mentiond above) and verified
  correctness.

Change-Id: I6b0c12a256c33e8ef32315b3736cae2dea2ae705
Reviewed-on: http://gerrit.cloudera.org:8080/6792
Reviewed-by: Matthew Jacobs <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/eb54287f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/eb54287f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/eb54287f

Branch: refs/heads/master
Commit: eb54287fb4c635c8fc6c96872e87ad5a98b16339
Parents: 7191e1b
Author: Thomas Tauber-Marshall <[email protected]>
Authored: Tue Apr 25 19:30:46 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Wed May 24 22:27:37 2017 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-scan-node-base.cc  |  3 ++-
 be/src/exec/kudu-table-sink.cc      | 19 +++++++++----------
 be/src/exec/kudu-table-sink.h       | 11 ++++++-----
 be/src/exprs/kudu-partition-expr.cc |  5 +++--
 be/src/runtime/exec-env.cc          | 25 +++++++++++++++++++++++++
 be/src/runtime/exec-env.h           | 28 ++++++++++++++++++++++++++++
 be/src/runtime/query-state.cc       | 25 -------------------------
 be/src/runtime/query-state.h        | 26 --------------------------
 8 files changed, 73 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/eb54287f/be/src/exec/kudu-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.cc 
b/be/src/exec/kudu-scan-node-base.cc
index fd09d6d..c8a7870 100644
--- a/be/src/exec/kudu-scan-node-base.cc
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -27,6 +27,7 @@
 #include "exec/kudu-scanner.h"
 #include "exec/kudu-util.h"
 #include "exprs/expr.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-pool.h"
 #include "runtime/query-state.h"
 #include "runtime/runtime-state.h"
@@ -94,7 +95,7 @@ Status KuduScanNodeBase::Open(RuntimeState* state) {
   const KuduTableDescriptor* table_desc =
       static_cast<const KuduTableDescriptor*>(tuple_desc_->table_desc());
 
-  RETURN_IF_ERROR(runtime_state_->query_state()->GetKuduClient(
+  RETURN_IF_ERROR(runtime_state_->exec_env()->GetKuduClient(
       table_desc->kudu_master_addresses(), &client_));
 
   uint64_t latest_ts = static_cast<uint64_t>(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/eb54287f/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 75ca2fc..b5ffc27 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -25,6 +25,7 @@
 #include "exprs/expr-context.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gutil/gscoped_ptr.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "util/runtime-profile-counters.h"
@@ -77,10 +78,7 @@ KuduTableSink::KuduTableSink(const RowDescriptor& row_desc,
       table_id_(tsink.table_sink.target_table_id),
       select_list_texprs_(select_list_texprs),
       sink_action_(tsink.table_sink.action),
-      kudu_table_sink_(tsink.table_sink.kudu_table_sink),
-      total_rows_(NULL),
-      num_row_errors_(NULL),
-      rows_processed_rate_(NULL) {
+      kudu_table_sink_(tsink.table_sink.kudu_table_sink) {
   DCHECK(KuduIsAvailable());
 }
 
@@ -101,7 +99,7 @@ Status KuduTableSink::Prepare(RuntimeState* state, 
MemTracker* parent_mem_tracke
 
   // Get the kudu table descriptor.
   TableDescriptor* table_desc = 
state->desc_tbl().GetTableDescriptor(table_id_);
-  DCHECK(table_desc != NULL);
+  DCHECK(table_desc != nullptr);
 
   // In debug mode try a dynamic cast. If it fails it means that the
   // TableDescriptor is not an instance of KuduTableDescriptor.
@@ -140,11 +138,12 @@ Status KuduTableSink::Open(RuntimeState* state) {
         "Could not allocate memory for KuduTableSink", required_mem);
   }
 
-  Status s = CreateKuduClient(table_desc_->kudu_master_addresses(), &client_);
+  Status s =
+      state->exec_env()->GetKuduClient(table_desc_->kudu_master_addresses(), 
&client_);
   if (!s.ok()) {
     // Close() releases memory if client_ is not NULL, but since the memory 
was consumed
     // and the client failed to be created, it must be released.
-    DCHECK(client_.get() == NULL);
+    DCHECK(client_ == nullptr);
     mem_tracker_->Release(required_mem);
     return s;
   }
@@ -233,7 +232,7 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* 
batch) {
           j : kudu_table_sink_.referenced_columns[j];
 
       void* value = output_expr_ctxs_[j]->GetValue(current_row);
-      if (value == NULL) {
+      if (value == nullptr) {
         if (table_schema.Column(col).is_nullable()) {
           KUDU_RETURN_IF_ERROR(write->mutable_row()->SetNull(col),
               "Could not add Kudu WriteOp.");
@@ -335,9 +334,9 @@ Status KuduTableSink::FlushFinal(RuntimeState* state) {
 
 void KuduTableSink::Close(RuntimeState* state) {
   if (closed_) return;
-  if (client_.get() != NULL) {
+  if (client_ != nullptr) {
     mem_tracker_->Release(FLAGS_kudu_sink_mem_required);
-    client_.reset();
+    client_ = nullptr;
   }
   SCOPED_TIMER(profile()->total_time_counter());
   Expr::Close(output_expr_ctxs_, state);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/eb54287f/be/src/exec/kudu-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h
index 3dd831f..b2239fb 100644
--- a/be/src/exec/kudu-table-sink.h
+++ b/be/src/exec/kudu-table-sink.h
@@ -99,8 +99,9 @@ class KuduTableSink : public DataSink {
   const std::vector<TExpr>& select_list_texprs_;
   std::vector<ExprContext*> output_expr_ctxs_;
 
-  /// The Kudu client, table and session.
-  kudu::client::sp::shared_ptr<kudu::client::KuduClient> client_;
+  /// The Kudu client, owned by the ExecEnv.
+  kudu::client::KuduClient* client_ = nullptr;
+  /// The Kudu table and session.
   kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_;
   kudu::client::sp::shared_ptr<kudu::client::KuduSession> session_;
 
@@ -118,14 +119,14 @@ class KuduTableSink : public DataSink {
 
   /// Total number of rows processed, i.e. rows written to Kudu and also rows 
with
   /// errors.
-  RuntimeProfile::Counter* total_rows_;
+  RuntimeProfile::Counter* total_rows_ = nullptr;
 
   /// The number of rows with errors.
-  RuntimeProfile::Counter* num_row_errors_;
+  RuntimeProfile::Counter* num_row_errors_ = nullptr;
 
   /// Rate at which the sink consumes and processes rows, i.e. writing rows to 
Kudu or
   /// skipping rows that are known to violate nullability constraints.
-  RuntimeProfile::Counter* rows_processed_rate_;
+  RuntimeProfile::Counter* rows_processed_rate_ = nullptr;
 };
 
 }  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/eb54287f/be/src/exprs/kudu-partition-expr.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/kudu-partition-expr.cc 
b/be/src/exprs/kudu-partition-expr.cc
index ea295af..46da82d 100644
--- a/be/src/exprs/kudu-partition-expr.cc
+++ b/be/src/exprs/kudu-partition-expr.cc
@@ -21,6 +21,7 @@
 
 #include "exec/kudu-util.h"
 #include "exprs/expr-context.h"
+#include "runtime/exec-env.h"
 #include "runtime/query-state.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tuple-row.h"
@@ -45,8 +46,8 @@ Status KuduPartitionExpr::Prepare(
       << "Target table for KuduPartitioner must be a Kudu table.";
   table_desc_ = static_cast<KuduTableDescriptor*>(table_desc);
   kudu::client::KuduClient* client;
-  RETURN_IF_ERROR(
-      
state->query_state()->GetKuduClient(table_desc_->kudu_master_addresses(), 
&client));
+  RETURN_IF_ERROR(ExecEnv::GetInstance()->GetKuduClient(
+      table_desc_->kudu_master_addresses(), &client));
   kudu::client::sp::shared_ptr<kudu::client::KuduTable> table;
   KUDU_RETURN_IF_ERROR(
       client->OpenTable(table_desc_->table_name(), &table), "Failed to open 
Kudu table.");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/eb54287f/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index d98b9d7..0354b37 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -22,8 +22,10 @@
 #include <boost/algorithm/string.hpp>
 #include <gflags/gflags.h>
 #include <gutil/strings/substitute.h>
+#include <kudu/client/client.h>
 
 #include "common/logging.h"
+#include "exec/kudu-util.h"
 #include "gen-cpp/CatalogService.h"
 #include "gen-cpp/ImpalaInternalService.h"
 #include "runtime/backend-client.h"
@@ -64,6 +66,7 @@
 #include "common/names.h"
 
 using boost::algorithm::is_any_of;
+using boost::algorithm::join;
 using boost::algorithm::split;
 using boost::algorithm::to_lower;
 using boost::algorithm::token_compress_on;
@@ -127,6 +130,10 @@ const static string DEFAULT_FS = "fs.defaultFS";
 
 namespace impala {
 
+struct ExecEnv::KuduClientPtr {
+  kudu::client::sp::shared_ptr<kudu::client::KuduClient> kudu_client;
+};
+
 ExecEnv* ExecEnv::exec_env_ = nullptr;
 
 ExecEnv::ExecEnv()
@@ -385,4 +392,22 @@ void ExecEnv::InitBufferPool(int64_t min_page_size, 
int64_t capacity) {
   buffer_reservation_.reset(new ReservationTracker());
   buffer_reservation_->InitRootTracker(nullptr, capacity);
 }
+
+Status ExecEnv::GetKuduClient(
+    const vector<string>& master_addresses, kudu::client::KuduClient** client) 
{
+  string master_addr_concat = join(master_addresses, ",");
+  lock_guard<SpinLock> l(kudu_client_map_lock_);
+  auto kudu_client_map_it = kudu_client_map_.find(master_addr_concat);
+  if (kudu_client_map_it == kudu_client_map_.end()) {
+    // KuduClient doesn't exist, create it
+    KuduClientPtr* kudu_client_ptr = new KuduClientPtr;
+    RETURN_IF_ERROR(CreateKuduClient(master_addresses, 
&kudu_client_ptr->kudu_client));
+    kudu_client_map_[master_addr_concat].reset(kudu_client_ptr);
+    *client = kudu_client_ptr->kudu_client.get();
+  } else {
+    // Return existing KuduClient
+    *client = kudu_client_map_it->second->kudu_client.get();
+  }
+  return Status::OK();
+}
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/eb54287f/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 43b0e0b..f914a83 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -19,12 +19,17 @@
 #ifndef IMPALA_RUNTIME_EXEC_ENV_H
 #define IMPALA_RUNTIME_EXEC_ENV_H
 
+#include <unordered_map>
+
 #include <boost/scoped_ptr.hpp>
 
 // NOTE: try not to add more headers here: exec-env.h is included in many many 
files.
 #include "common/status.h"
 #include "runtime/client-cache-types.h"
 #include "util/hdfs-bulk-ops-defs.h" // For declaration of HdfsOpThreadPool
+#include "util/spinlock.h"
+
+namespace kudu { namespace client { class KuduClient; } }
 
 namespace impala {
 
@@ -48,6 +53,7 @@ class ReservationTracker;
 class Scheduler;
 class StatestoreSubscriber;
 class TestExecEnv;
+
 class ThreadResourceMgr;
 class TmpFileMgr;
 class Webserver;
@@ -120,6 +126,13 @@ class ExecEnv {
   /// Returns the configured defaultFs set in core-site.xml
   string default_fs() { return default_fs_; }
 
+  /// Gets a KuduClient for this list of master addresses. It will look up and 
share
+  /// an existing KuduClient if possible. Otherwise, it will create a new 
KuduClient
+  /// internally and return a pointer to it. All KuduClients accessed through 
this
+  /// interface are owned by the ExecEnv. Thread safe.
+  Status GetKuduClient(
+      const std::vector<std::string>& master_addrs, kudu::client::KuduClient** 
client);
+
  protected:
   /// Leave protected so that subclasses can override
   boost::scoped_ptr<MetricGroup> metrics_;
@@ -166,6 +179,21 @@ class ExecEnv {
   /// fs.defaultFs value set in core-site.xml
   std::string default_fs_;
 
+  SpinLock kudu_client_map_lock_; // protects kudu_client_map_
+
+  /// Opaque type for storing the pointer to the KuduClient. This allows us
+  /// to avoid including Kudu header files.
+  struct KuduClientPtr;
+
+  /// Map from the master addresses string for a Kudu table to the 
KuduClientPtr for
+  /// accessing that table. The master address string is constructed by joining
+  /// the sorted master address list entries with a comma separator.
+  typedef std::unordered_map<std::string, std::unique_ptr<KuduClientPtr>> 
KuduClientMap;
+
+  /// Map for sharing KuduClients across the ExecEnv. This map requires that 
the master
+  /// address lists be identical in order to share a KuduClient.
+  KuduClientMap kudu_client_map_;
+
   /// Initialise 'buffer_pool_' and 'buffer_reservation_' with given capacity.
   void InitBufferPool(int64_t min_page_len, int64_t capacity);
 };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/eb54287f/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 1ae0f36..8fec487 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -19,10 +19,8 @@
 
 #include <boost/thread/lock_guard.hpp>
 #include <boost/thread/locks.hpp>
-#include <kudu/client/client.h>
 
 #include "exprs/expr.h"
-#include "exec/kudu-util.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/backend-client.h"
@@ -37,15 +35,10 @@
 
 #include "common/names.h"
 
-using boost::algorithm::join;
 using namespace impala;
 
 #define RETRY_SLEEP_MS 100
 
-struct QueryState::KuduClientPtr {
-  kudu::client::sp::shared_ptr<kudu::client::KuduClient> kudu_client;
-};
-
 QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) {
   DCHECK(ExecEnv::GetInstance()->query_exec_mgr() != nullptr);
   query_state_ = 
ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id);
@@ -352,24 +345,6 @@ void QueryState::Cancel() {
   for (auto entry: fis_map_) entry.second->Cancel();
 }
 
-Status QueryState::GetKuduClient(
-    const vector<string>& master_addresses, kudu::client::KuduClient** client) 
{
-  string master_addr_concat = join(master_addresses, ",");
-  lock_guard<SpinLock> l(kudu_client_map_lock_);
-  auto kudu_client_map_it = kudu_client_map_.find(master_addr_concat);
-  if (kudu_client_map_it == kudu_client_map_.end()) {
-    // KuduClient doesn't exist, create it
-    KuduClientPtr* kudu_client_ptr = new KuduClientPtr;
-    RETURN_IF_ERROR(CreateKuduClient(master_addresses, 
&kudu_client_ptr->kudu_client));
-    kudu_client_map_[master_addr_concat].reset(kudu_client_ptr);
-    *client = kudu_client_ptr->kudu_client.get();
-  } else {
-    // Return existing KuduClient
-    *client = kudu_client_map_it->second->kudu_client.get();
-  }
-  return Status::OK();
-}
-
 void QueryState::PublishFilter(int32_t filter_id, int fragment_idx,
     const TBloomFilter& thrift_bloom_filter) {
   if (!instances_prepared_promise_.Get().ok()) return;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/eb54287f/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 5660a49..9ce4316 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -31,8 +31,6 @@
 #include "util/uid-util.h"
 #include "util/promise.h"
 
-namespace kudu { namespace client { class KuduClient; } }
-
 namespace impala {
 
 class FragmentInstanceState;
@@ -151,14 +149,6 @@ class QueryState {
   /// Not idempotent, not thread-safe.
   void ReleaseResources();
 
-  /// Gets a KuduClient for this list of master addresses. It will lookup and 
share
-  /// an existing KuduClient if possible. Otherwise, it will create a new 
KuduClient
-  /// internally and return a pointer to it. All KuduClients accessed through 
this
-  /// interface are owned by the QueryState. Thread safe.
-  Status GetKuduClient(
-      const std::vector<std::string>& master_addrs, kudu::client::KuduClient** 
client)
-      WARN_UNUSED_RESULT;
-
   /// Sends a ReportExecStatus rpc to the coordinator. If fis == nullptr, the
   /// status must be an error. If fis is given, expects that fis finished its 
Prepare
   /// phase; it then sends a report for that instance, including its profile.
@@ -224,22 +214,6 @@ class QueryState {
   /// True if and only if ReleaseResources() has been called.
   bool released_resources_ = false;
 
-  SpinLock kudu_client_map_lock_; // protects kudu_client_map_
-
-  /// Opaque type for storing the pointer to the KuduClient. This allows us
-  /// to avoid including Kudu header files.
-  struct KuduClientPtr;
-
-  /// Map from the master addresses string for a Kudu table to the 
KuduClientPtr for
-  /// accessing that table. The master address string is constructed by joining
-  /// the master address list entries with a comma separator.
-  typedef std::unordered_map<std::string, std::unique_ptr<KuduClientPtr>> 
KuduClientMap;
-
-  /// Map for sharing KuduClients between fragment instances. Each Kudu table 
has
-  /// a list of master addresses stored in the Hive Metastore. This map 
requires
-  /// that the master address lists be identical in order to share a 
KuduClient.
-  KuduClientMap kudu_client_map_;
-
   /// Create QueryState w/ refcnt of 0.
   /// The query is associated with the resource pool query_ctx.request_pool or
   /// 'request_pool', if the former is not set (needed for tests).

Reply via email to