IMPALA-4478: Initial Kudu client mem tracking for sink The Kudu client allocates memory which is not tracked by Impala. There are several sources, but the most significant is the memory allocated by the KuduSession on the write path. This can be >100MB, so it is important to track to avoid OOM.
Moving forward, we should have a better way to track Kudu client memory, but for now we must at least handle this potentially problematic case. This changes the KuduTableSink to consume 200MB which should be enough for the 100MB write mutation buffer as well as 100MB worth of errors buffered in the client before Impala takes ownership of them (and deletes them). This is left as a flag because it may turn out to be too high for some users and too low for others. When we have better support from Kudu (including KUDU-1752), we should simplify this. TODO: Handle DML w/ small or known resource requirements (e.g. VALUES specified or query has LIMIT) specially to avoid over-consumption. Testing: Have verified acceptable behavior in the stress test with a simple workload containing DML statements of moderate cardinality. Change-Id: I47f17a81e4362ab490019382fedc66c25f07080a Reviewed-on: http://gerrit.cloudera.org:8080/5152 Reviewed-by: Matthew Jacobs <[email protected]> Reviewed-by: Dan Hecht <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7bfa6e3e Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7bfa6e3e Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7bfa6e3e Branch: refs/heads/master Commit: 7bfa6e3e3f20d741f36e713ea62f9324e5739ea0 Parents: af67b2f Author: Matthew Jacobs <[email protected]> Authored: Mon Nov 14 15:31:38 2016 -0800 Committer: Internal Jenkins <[email protected]> Committed: Tue Nov 22 00:08:04 2016 +0000 ---------------------------------------------------------------------- be/src/exec/kudu-table-sink.cc | 40 ++++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7bfa6e3e/be/src/exec/kudu-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc index a195e84..20bbe69 100644 --- a/be/src/exec/kudu-table-sink.cc +++ b/be/src/exec/kudu-table-sink.cc @@ -31,8 +31,25 @@ #include "common/names.h" -DEFINE_int32(kudu_mutation_buffer_size, 100 * 1024 * 1024, "The size (bytes) of the " - "Kudu client buffer for mutations."); +#define DEFAULT_KUDU_MUTATION_BUFFER_SIZE 100 * 1024 * 1024 + +DEFINE_int32(kudu_mutation_buffer_size, DEFAULT_KUDU_MUTATION_BUFFER_SIZE, + "The size (bytes) of the Kudu client buffer for mutations."); + +// The memory (bytes) that this node needs to consume in order to operate. This is +// necessary because the KuduClient allocates non-trivial amounts of untracked memory, +// and is potentially unbounded due to how Kudu's async error reporting works. +// Until Kudu's client memory usage can be bounded (KUDU-1752), we estimate that 2x the +// mutation buffer size is enough memory, and that seems to provide acceptable results in +// testing. This is still exposed as a flag for now though, because it may be possible +// that in some cases this is always too high (in which case tracked mem >> RSS and the +// memory is underutilized), or this may not be high enough (e.g. we underestimate the +// size of error strings, and RSS grows until the process is killed). +// TODO: Handle DML w/ small or known resource requirements (e.g. VALUES specified or +// query has LIMIT) specially to avoid over-consumption. +DEFINE_int32(kudu_sink_mem_required, 2 * DEFAULT_KUDU_MUTATION_BUFFER_SIZE, + "(Advanced) The memory required (bytes) for a KuduTableSink. The default value is " + " 2x the kudu_mutation_buffer_size. This flag is subject to change or removal."); DECLARE_int32(kudu_operation_timeout_ms); @@ -116,8 +133,21 @@ Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke Status KuduTableSink::Open(RuntimeState* state) { RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, state)); - RETURN_IF_ERROR(CreateKuduClient(table_desc_->kudu_master_addresses(), &client_)); + int64_t required_mem = FLAGS_kudu_sink_mem_required; + if (!mem_tracker_->TryConsume(required_mem)) { + return mem_tracker_->MemLimitExceeded(state, + "Could not allocate memory for KuduTableSink", required_mem); + } + + Status s = CreateKuduClient(table_desc_->kudu_master_addresses(), &client_); + if (!s.ok()) { + // Close() releases memory if client_ is not NULL, but since the memory was consumed + // and the client failed to be created, it must be released. + DCHECK(client_.get() == NULL); + mem_tracker_->Release(required_mem); + return s; + } KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc_->table_name(), &table_), "Unable to open Kudu table"); @@ -350,6 +380,10 @@ Status KuduTableSink::FlushFinal(RuntimeState* state) { void KuduTableSink::Close(RuntimeState* state) { if (closed_) return; + if (client_.get() != NULL) { + mem_tracker_->Release(FLAGS_kudu_sink_mem_required); + client_.reset(); + } SCOPED_TIMER(profile()->total_time_counter()); Expr::Close(output_expr_ctxs_, state); DataSink::Close(state);
