IMPALA-4478: Initial Kudu client mem tracking for sink

The Kudu client allocates memory which is not tracked by
Impala. There are several sources, but the most significant
is the memory allocated by the KuduSession on the write
path. This can be >100MB, so it is important to track to
avoid OOM.

Moving forward, we should have a better way to track Kudu
client memory, but for now we must at least handle this
potentially problematic case.

This changes the KuduTableSink to consume 200MB which should
be enough for the 100MB write mutation buffer as well as
100MB worth of errors buffered in the client before Impala
takes ownership of them (and deletes them). This is left as
a flag because it may turn out to be too high for some users
and too low for others. When we have better support from
Kudu (including KUDU-1752), we should simplify this.

TODO: Handle DML w/ small or known resource requirements
(e.g. VALUES specified or query has LIMIT) specially to
avoid over-consumption.

Testing: Have verified acceptable behavior in the stress
test with a simple workload containing DML statements of
moderate cardinality.

Change-Id: I47f17a81e4362ab490019382fedc66c25f07080a
Reviewed-on: http://gerrit.cloudera.org:8080/5152
Reviewed-by: Matthew Jacobs <[email protected]>
Reviewed-by: Dan Hecht <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7bfa6e3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7bfa6e3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7bfa6e3e

Branch: refs/heads/master
Commit: 7bfa6e3e3f20d741f36e713ea62f9324e5739ea0
Parents: af67b2f
Author: Matthew Jacobs <[email protected]>
Authored: Mon Nov 14 15:31:38 2016 -0800
Committer: Internal Jenkins <[email protected]>
Committed: Tue Nov 22 00:08:04 2016 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-table-sink.cc | 40 ++++++++++++++++++++++++++++++++++---
 1 file changed, 37 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7bfa6e3e/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index a195e84..20bbe69 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -31,8 +31,25 @@
 
 #include "common/names.h"
 
-DEFINE_int32(kudu_mutation_buffer_size, 100 * 1024 * 1024, "The size (bytes) 
of the "
-    "Kudu client buffer for mutations.");
+#define DEFAULT_KUDU_MUTATION_BUFFER_SIZE 100 * 1024 * 1024
+
+DEFINE_int32(kudu_mutation_buffer_size, DEFAULT_KUDU_MUTATION_BUFFER_SIZE,
+    "The size (bytes) of the Kudu client buffer for mutations.");
+
+// The memory (bytes) that this node needs to consume in order to operate. 
This is
+// necessary because the KuduClient allocates non-trivial amounts of untracked 
memory,
+// and is potentially unbounded due to how Kudu's async error reporting works.
+// Until Kudu's client memory usage can be bounded (KUDU-1752), we estimate 
that 2x the
+// mutation buffer size is enough memory, and that seems to provide acceptable 
results in
+// testing. This is still exposed as a flag for now though, because it may be 
possible
+// that in some cases this is always too high (in which case tracked mem >> 
RSS and the
+// memory is underutilized), or this may not be high enough (e.g. we 
underestimate the
+// size of error strings, and RSS grows until the process is killed).
+// TODO: Handle DML w/ small or known resource requirements (e.g. VALUES 
specified or
+// query has LIMIT) specially to avoid over-consumption.
+DEFINE_int32(kudu_sink_mem_required, 2 * DEFAULT_KUDU_MUTATION_BUFFER_SIZE,
+    "(Advanced) The memory required (bytes) for a KuduTableSink. The default 
value is "
+    " 2x the kudu_mutation_buffer_size. This flag is subject to change or 
removal.");
 
 DECLARE_int32(kudu_operation_timeout_ms);
 
@@ -116,8 +133,21 @@ Status KuduTableSink::Prepare(RuntimeState* state, 
MemTracker* parent_mem_tracke
 
 Status KuduTableSink::Open(RuntimeState* state) {
   RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, state));
-  RETURN_IF_ERROR(CreateKuduClient(table_desc_->kudu_master_addresses(), 
&client_));
 
+  int64_t required_mem = FLAGS_kudu_sink_mem_required;
+  if (!mem_tracker_->TryConsume(required_mem)) {
+    return mem_tracker_->MemLimitExceeded(state,
+        "Could not allocate memory for KuduTableSink", required_mem);
+  }
+
+  Status s = CreateKuduClient(table_desc_->kudu_master_addresses(), &client_);
+  if (!s.ok()) {
+    // Close() releases memory if client_ is not NULL, but since the memory 
was consumed
+    // and the client failed to be created, it must be released.
+    DCHECK(client_.get() == NULL);
+    mem_tracker_->Release(required_mem);
+    return s;
+  }
   KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc_->table_name(), &table_),
       "Unable to open Kudu table");
 
@@ -350,6 +380,10 @@ Status KuduTableSink::FlushFinal(RuntimeState* state) {
 
 void KuduTableSink::Close(RuntimeState* state) {
   if (closed_) return;
+  if (client_.get() != NULL) {
+    mem_tracker_->Release(FLAGS_kudu_sink_mem_required);
+    client_.reset();
+  }
   SCOPED_TIMER(profile()->total_time_counter());
   Expr::Close(output_expr_ctxs_, state);
   DataSink::Close(state);

Reply via email to