IMPALA-7351: Add estimates to kudu table sink

The kudu table sink allocates untracked memory which is bounded by
limits that impala enforces through the kudu client API. This patch
adds a constant estimate to this table sink which is based on those
limits.

Testing:
Modified planner tests accordingly.

Change-Id: I89a45dce0cfbbe3cc0bc17d55ffdbd41cd7dbfbd
Reviewed-on: http://gerrit.cloudera.org:8080/12077
Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c209fed5
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c209fed5
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c209fed5

Branch: refs/heads/master
Commit: c209fed5350666c0b62004670714bdb404c39e06
Parents: 1cbcd0c
Author: Bikramjeet Vig <bikramjeet....@cloudera.com>
Authored: Tue Dec 11 15:50:31 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Thu Dec 13 03:46:35 2018 +0000

----------------------------------------------------------------------
 be/src/util/backend-gflag-util.cc               |  4 +
 common/thrift/BackendGflags.thrift              |  4 +
 .../apache/impala/planner/KuduTableSink.java    | 13 ++-
 .../PlannerTest/resource-requirements.test      | 91 +++++++++++++++-----
 4 files changed, 89 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c209fed5/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index 9f4f2d0..f78b317 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -68,6 +68,8 @@ DECLARE_int64(kudu_scanner_thread_max_estimated_bytes);
 DECLARE_int32(catalog_max_parallel_partial_fetch_rpc);
 DECLARE_int64(catalog_partial_fetch_rpc_queue_timeout_s);
 DECLARE_int64(exchg_node_buffer_size_bytes);
+DECLARE_int32(kudu_mutation_buffer_size);
+DECLARE_int32(kudu_error_buffer_size);
 
 namespace impala {
 
@@ -135,6 +137,8 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* 
cfg_bytes) {
       FLAGS_catalog_partial_fetch_rpc_queue_timeout_s);
   cfg.__set_exchg_node_buffer_size_bytes(
       FLAGS_exchg_node_buffer_size_bytes);
+  cfg.__set_kudu_mutation_buffer_size(FLAGS_kudu_mutation_buffer_size);
+  cfg.__set_kudu_error_buffer_size(FLAGS_kudu_error_buffer_size);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/c209fed5/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index dff4b8c..a56b260 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -109,4 +109,8 @@ struct TBackendGflags {
   41: required i64 catalog_partial_fetch_rpc_queue_timeout_s
 
   42: required i64 exchg_node_buffer_size_bytes
+
+  43: required i32 kudu_mutation_buffer_size
+
+  44: required i32 kudu_error_buffer_size
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/c209fed5/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java 
b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
index bb57b1f..ba6f93a 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.catalog.FeTable;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
@@ -64,8 +65,16 @@ public class KuduTableSink extends TableSink {
 
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
-    // TODO: add a memory estimate
-    resourceProfile_ = ResourceProfile.noReservation(0);
+    // The major chunk of memory used by this node is untracked. Part of which
+    // is allocated by the KuduSession on the write path and the rest is the
+    // memory used to store kudu client error messages. Fortunately, both of
+    // them have an upper limit which is used directly to set the estimates 
here.
+    long kuduMutationBufferSize = BackendConfig.INSTANCE.getBackendCfg().
+        kudu_mutation_buffer_size;
+    long kuduErrorBufferSize = BackendConfig.INSTANCE.getBackendCfg().
+        kudu_error_buffer_size;
+    resourceProfile_ = ResourceProfile.noReservation(kuduMutationBufferSize +
+        kuduErrorBufferSize);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/c209fed5/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index f86dd5a..7f4a12f 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -2422,7 +2422,7 @@ from tpch.lineitem inner join /* +shuffle */ tpch.orders 
on l_orderkey = o_order
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=51.00MB Threads=3
 Per-Host Resource Estimates: Memory=446MB
-Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN 
+Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN
 -- +shuffle
 tpch.orders ON l_orderkey = o_orderkey
 
@@ -2462,7 +2462,7 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=52.00MB Threads=6
 Per-Host Resource Estimates: Memory=300MB
-Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN 
+Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN
 -- +shuffle
 tpch.orders ON l_orderkey = o_orderkey
 
@@ -2523,7 +2523,7 @@ Per-Host Resources: mem-estimate=89.00MB 
mem-reservation=9.00MB thread-reservati
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=104.00MB Threads=7
 Per-Host Resource Estimates: Memory=481MB
-Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN 
+Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN
 -- +shuffle
 tpch.orders ON l_orderkey = o_orderkey
 
@@ -4953,11 +4953,11 @@ join (
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=99.00MB Threads=5
 Per-Host Resource Estimates: Memory=180MB
-Analyzed query: SELECT 
+Analyzed query: SELECT
 -- +straight_join
-* FROM tpch_parquet.orders t1 INNER JOIN (SELECT 
+* FROM tpch_parquet.orders t1 INNER JOIN (SELECT
 -- +straight_join
-t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT 
+t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT
 -- +straight_join
 t3.o_orderkey k3, t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN
 tpch_parquet.orders t4 ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 =
@@ -5037,11 +5037,11 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=100.50MB Threads=10
 Per-Host Resource Estimates: Memory=260MB
-Analyzed query: SELECT 
+Analyzed query: SELECT
 -- +straight_join
-* FROM tpch_parquet.orders t1 INNER JOIN (SELECT 
+* FROM tpch_parquet.orders t1 INNER JOIN (SELECT
 -- +straight_join
-t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT 
+t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT
 -- +straight_join
 t3.o_orderkey k3, t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN
 tpch_parquet.orders t4 ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 =
@@ -5156,11 +5156,11 @@ Per-Host Resources: mem-estimate=88.84MB 
mem-reservation=59.00MB thread-reservat
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=176.50MB Threads=11
 Per-Host Resource Estimates: Memory=454MB
-Analyzed query: SELECT 
+Analyzed query: SELECT
 -- +straight_join
-* FROM tpch_parquet.orders t1 INNER JOIN (SELECT 
+* FROM tpch_parquet.orders t1 INNER JOIN (SELECT
 -- +straight_join
-t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT 
+t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT
 -- +straight_join
 t3.o_orderkey k3, t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN
 tpch_parquet.orders t4 ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 =
@@ -5315,11 +5315,11 @@ join (
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=176.00KB Threads=5
 Per-Host Resource Estimates: Memory=138MB
-Analyzed query: SELECT 
+Analyzed query: SELECT
 -- +straight_join
-* FROM tpch_parquet.nation t1 INNER JOIN (SELECT 
+* FROM tpch_parquet.nation t1 INNER JOIN (SELECT
 -- +straight_join
-t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT 
+t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT
 -- +straight_join
 t3.n_nationkey k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN
 tpch_parquet.supplier t4) v2) v1
@@ -5386,11 +5386,11 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=176.00KB Threads=9
 Per-Host Resource Estimates: Memory=161MB
-Analyzed query: SELECT 
+Analyzed query: SELECT
 -- +straight_join
-* FROM tpch_parquet.nation t1 INNER JOIN (SELECT 
+* FROM tpch_parquet.nation t1 INNER JOIN (SELECT
 -- +straight_join
-t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT 
+t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT
 -- +straight_join
 t3.n_nationkey k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN
 tpch_parquet.supplier t4) v2) v1
@@ -5485,11 +5485,11 @@ Per-Host Resources: mem-estimate=97.55MB 
mem-reservation=32.00KB thread-reservat
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=352.00KB Threads=9
 Per-Host Resource Estimates: Memory=311MB
-Analyzed query: SELECT 
+Analyzed query: SELECT
 -- +straight_join
-* FROM tpch_parquet.nation t1 INNER JOIN (SELECT 
+* FROM tpch_parquet.nation t1 INNER JOIN (SELECT
 -- +straight_join
-t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT 
+t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT
 -- +straight_join
 t3.n_nationkey k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN
 tpch_parquet.supplier t4) v2) v1
@@ -5830,3 +5830,52 @@ Per-Host Resources: mem-estimate=806.43MB 
mem-reservation=74.00MB thread-reserva
    tuple-ids=0 row-size=231B cardinality=6001215
    in pipelines: 00(GETNEXT)
 ====
+# kudu insert
+insert into functional_kudu.tinyinttable values(1);
+---- PLAN
+Max Per-Host Resource Reservation: Memory=0B Threads=1
+Per-Host Resource Estimates: Memory=20MB
+Codegen disabled by planner
+Analyzed query: SELECT CAST(1 AS TINYINT) UNION SELECT CAST(1 AS TINYINT)
+
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=20.00MB mem-reservation=0B 
thread-reservation=1
+INSERT INTO KUDU [functional_kudu.tinyinttable]
+|  mem-estimate=20.00MB mem-reservation=0B thread-reservation=0
+|
+00:UNION
+   constant-operands=1
+   mem-estimate=0B mem-reservation=0B thread-reservation=0
+   tuple-ids=0 row-size=1B cardinality=1
+   in pipelines:
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=2.00MB Threads=2
+Per-Host Resource Estimates: Memory=22MB
+Codegen disabled by planner
+Analyzed query: SELECT CAST(1 AS TINYINT) UNION SELECT CAST(1 AS TINYINT)
+
+F01:PLAN FRAGMENT [KUDU(KuduPartition(1))] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=22.02MB mem-reservation=2.00MB 
thread-reservation=1
+INSERT INTO KUDU [functional_kudu.tinyinttable]
+|  mem-estimate=20.00MB mem-reservation=0B thread-reservation=0
+|
+02:PARTIAL SORT
+|  order by: KuduPartition(1) ASC NULLS LAST, 1 ASC NULLS LAST
+|  materialized: KuduPartition(1)
+|  mem-estimate=2.00MB mem-reservation=2.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=1 row-size=5B cardinality=1
+|  in pipelines:
+|
+01:EXCHANGE [KUDU(KuduPartition(1))]
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=1B cardinality=1
+|  in pipelines:
+|
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+00:UNION
+   constant-operands=1
+   mem-estimate=0B mem-reservation=0B thread-reservation=0
+   tuple-ids=0 row-size=1B cardinality=1
+   in pipelines:
+====

Reply via email to