IMPALA-7351: Add estimates to kudu table sink The kudu table sink allocates untracked memory which is bounded by limits that impala enforces through the kudu client API. This patch adds a constant estimate to this table sink which is based on those limits.
Testing: Modified planner tests accordingly. Change-Id: I89a45dce0cfbbe3cc0bc17d55ffdbd41cd7dbfbd Reviewed-on: http://gerrit.cloudera.org:8080/12077 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c209fed5 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c209fed5 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c209fed5 Branch: refs/heads/master Commit: c209fed5350666c0b62004670714bdb404c39e06 Parents: 1cbcd0c Author: Bikramjeet Vig <bikramjeet....@cloudera.com> Authored: Tue Dec 11 15:50:31 2018 -0800 Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Committed: Thu Dec 13 03:46:35 2018 +0000 ---------------------------------------------------------------------- be/src/util/backend-gflag-util.cc | 4 + common/thrift/BackendGflags.thrift | 4 + .../apache/impala/planner/KuduTableSink.java | 13 ++- .../PlannerTest/resource-requirements.test | 91 +++++++++++++++----- 4 files changed, 89 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/c209fed5/be/src/util/backend-gflag-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index 9f4f2d0..f78b317 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -68,6 +68,8 @@ DECLARE_int64(kudu_scanner_thread_max_estimated_bytes); DECLARE_int32(catalog_max_parallel_partial_fetch_rpc); DECLARE_int64(catalog_partial_fetch_rpc_queue_timeout_s); DECLARE_int64(exchg_node_buffer_size_bytes); +DECLARE_int32(kudu_mutation_buffer_size); +DECLARE_int32(kudu_error_buffer_size); namespace impala { @@ -135,6 +137,8 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) { FLAGS_catalog_partial_fetch_rpc_queue_timeout_s); cfg.__set_exchg_node_buffer_size_bytes( FLAGS_exchg_node_buffer_size_bytes); + cfg.__set_kudu_mutation_buffer_size(FLAGS_kudu_mutation_buffer_size); + cfg.__set_kudu_error_buffer_size(FLAGS_kudu_error_buffer_size); RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes)); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/impala/blob/c209fed5/common/thrift/BackendGflags.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index dff4b8c..a56b260 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -109,4 +109,8 @@ struct TBackendGflags { 41: required i64 catalog_partial_fetch_rpc_queue_timeout_s 42: required i64 exchg_node_buffer_size_bytes + + 43: required i32 kudu_mutation_buffer_size + + 44: required i32 kudu_error_buffer_size } http://git-wip-us.apache.org/repos/asf/impala/blob/c209fed5/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java index bb57b1f..ba6f93a 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.impala.analysis.DescriptorTable; import org.apache.impala.catalog.FeTable; +import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TDataSink; import org.apache.impala.thrift.TDataSinkType; import org.apache.impala.thrift.TExplainLevel; @@ -64,8 +65,16 @@ public class KuduTableSink extends TableSink { @Override public void computeResourceProfile(TQueryOptions queryOptions) { - // TODO: add a memory estimate - resourceProfile_ = ResourceProfile.noReservation(0); + // The major chunk of memory used by this node is untracked. Part of which + // is allocated by the KuduSession on the write path and the rest is the + // memory used to store kudu client error messages. Fortunately, both of + // them have an upper limit which is used directly to set the estimates here. + long kuduMutationBufferSize = BackendConfig.INSTANCE.getBackendCfg(). + kudu_mutation_buffer_size; + long kuduErrorBufferSize = BackendConfig.INSTANCE.getBackendCfg(). + kudu_error_buffer_size; + resourceProfile_ = ResourceProfile.noReservation(kuduMutationBufferSize + + kuduErrorBufferSize); } @Override http://git-wip-us.apache.org/repos/asf/impala/blob/c209fed5/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test index f86dd5a..7f4a12f 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test @@ -2422,7 +2422,7 @@ from tpch.lineitem inner join /* +shuffle */ tpch.orders on l_orderkey = o_order ---- PLAN Max Per-Host Resource Reservation: Memory=51.00MB Threads=3 Per-Host Resource Estimates: Memory=446MB -Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN +Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN -- +shuffle tpch.orders ON l_orderkey = o_orderkey @@ -2462,7 +2462,7 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=52.00MB Threads=6 Per-Host Resource Estimates: Memory=300MB -Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN +Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN -- +shuffle tpch.orders ON l_orderkey = o_orderkey @@ -2523,7 +2523,7 @@ Per-Host Resources: mem-estimate=89.00MB mem-reservation=9.00MB thread-reservati ---- PARALLELPLANS Max Per-Host Resource Reservation: Memory=104.00MB Threads=7 Per-Host Resource Estimates: Memory=481MB -Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN +Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN -- +shuffle tpch.orders ON l_orderkey = o_orderkey @@ -4953,11 +4953,11 @@ join ( ---- PLAN Max Per-Host Resource Reservation: Memory=99.00MB Threads=5 Per-Host Resource Estimates: Memory=180MB -Analyzed query: SELECT +Analyzed query: SELECT -- +straight_join -* FROM tpch_parquet.orders t1 INNER JOIN (SELECT +* FROM tpch_parquet.orders t1 INNER JOIN (SELECT -- +straight_join -t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT +t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT -- +straight_join t3.o_orderkey k3, t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN tpch_parquet.orders t4 ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 = @@ -5037,11 +5037,11 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=100.50MB Threads=10 Per-Host Resource Estimates: Memory=260MB -Analyzed query: SELECT +Analyzed query: SELECT -- +straight_join -* FROM tpch_parquet.orders t1 INNER JOIN (SELECT +* FROM tpch_parquet.orders t1 INNER JOIN (SELECT -- +straight_join -t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT +t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT -- +straight_join t3.o_orderkey k3, t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN tpch_parquet.orders t4 ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 = @@ -5156,11 +5156,11 @@ Per-Host Resources: mem-estimate=88.84MB mem-reservation=59.00MB thread-reservat ---- PARALLELPLANS Max Per-Host Resource Reservation: Memory=176.50MB Threads=11 Per-Host Resource Estimates: Memory=454MB -Analyzed query: SELECT +Analyzed query: SELECT -- +straight_join -* FROM tpch_parquet.orders t1 INNER JOIN (SELECT +* FROM tpch_parquet.orders t1 INNER JOIN (SELECT -- +straight_join -t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT +t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT -- +straight_join t3.o_orderkey k3, t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN tpch_parquet.orders t4 ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 = @@ -5315,11 +5315,11 @@ join ( ---- PLAN Max Per-Host Resource Reservation: Memory=176.00KB Threads=5 Per-Host Resource Estimates: Memory=138MB -Analyzed query: SELECT +Analyzed query: SELECT -- +straight_join -* FROM tpch_parquet.nation t1 INNER JOIN (SELECT +* FROM tpch_parquet.nation t1 INNER JOIN (SELECT -- +straight_join -t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT +t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT -- +straight_join t3.n_nationkey k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN tpch_parquet.supplier t4) v2) v1 @@ -5386,11 +5386,11 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=176.00KB Threads=9 Per-Host Resource Estimates: Memory=161MB -Analyzed query: SELECT +Analyzed query: SELECT -- +straight_join -* FROM tpch_parquet.nation t1 INNER JOIN (SELECT +* FROM tpch_parquet.nation t1 INNER JOIN (SELECT -- +straight_join -t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT +t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT -- +straight_join t3.n_nationkey k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN tpch_parquet.supplier t4) v2) v1 @@ -5485,11 +5485,11 @@ Per-Host Resources: mem-estimate=97.55MB mem-reservation=32.00KB thread-reservat ---- PARALLELPLANS Max Per-Host Resource Reservation: Memory=352.00KB Threads=9 Per-Host Resource Estimates: Memory=311MB -Analyzed query: SELECT +Analyzed query: SELECT -- +straight_join -* FROM tpch_parquet.nation t1 INNER JOIN (SELECT +* FROM tpch_parquet.nation t1 INNER JOIN (SELECT -- +straight_join -t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT +t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT -- +straight_join t3.n_nationkey k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN tpch_parquet.supplier t4) v2) v1 @@ -5830,3 +5830,52 @@ Per-Host Resources: mem-estimate=806.43MB mem-reservation=74.00MB thread-reserva tuple-ids=0 row-size=231B cardinality=6001215 in pipelines: 00(GETNEXT) ==== +# kudu insert +insert into functional_kudu.tinyinttable values(1); +---- PLAN +Max Per-Host Resource Reservation: Memory=0B Threads=1 +Per-Host Resource Estimates: Memory=20MB +Codegen disabled by planner +Analyzed query: SELECT CAST(1 AS TINYINT) UNION SELECT CAST(1 AS TINYINT) + +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +| Per-Host Resources: mem-estimate=20.00MB mem-reservation=0B thread-reservation=1 +INSERT INTO KUDU [functional_kudu.tinyinttable] +| mem-estimate=20.00MB mem-reservation=0B thread-reservation=0 +| +00:UNION + constant-operands=1 + mem-estimate=0B mem-reservation=0B thread-reservation=0 + tuple-ids=0 row-size=1B cardinality=1 + in pipelines: +---- DISTRIBUTEDPLAN +Max Per-Host Resource Reservation: Memory=2.00MB Threads=2 +Per-Host Resource Estimates: Memory=22MB +Codegen disabled by planner +Analyzed query: SELECT CAST(1 AS TINYINT) UNION SELECT CAST(1 AS TINYINT) + +F01:PLAN FRAGMENT [KUDU(KuduPartition(1))] hosts=1 instances=1 +| Per-Host Resources: mem-estimate=22.02MB mem-reservation=2.00MB thread-reservation=1 +INSERT INTO KUDU [functional_kudu.tinyinttable] +| mem-estimate=20.00MB mem-reservation=0B thread-reservation=0 +| +02:PARTIAL SORT +| order by: KuduPartition(1) ASC NULLS LAST, 1 ASC NULLS LAST +| materialized: KuduPartition(1) +| mem-estimate=2.00MB mem-reservation=2.00MB spill-buffer=2.00MB thread-reservation=0 +| tuple-ids=1 row-size=5B cardinality=1 +| in pipelines: +| +01:EXCHANGE [KUDU(KuduPartition(1))] +| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 +| tuple-ids=0 row-size=1B cardinality=1 +| in pipelines: +| +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1 +00:UNION + constant-operands=1 + mem-estimate=0B mem-reservation=0B thread-reservation=0 + tuple-ids=0 row-size=1B cardinality=1 + in pipelines: +====