This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit c7ce233679917761c4435852f049ac4b1d05ccce Author: Csaba Ringhofer <[email protected]> AuthorDate: Fri Sep 13 16:44:57 2024 +0200 IMPALA-12594: Add flag to tune KrpcDataStreamSender mem estimate The way the planner estimates mem usage for KrpcDataStreamSender is very different than how the backend actually uses memory - the planner assumes that batch_size number of rows are sent at a time while the BE tries to limit it to data_stream_sender_buffer_size_ (but doesn't consider var len data). The Jira has more detail about differences and issues. This change adds flag data_stream_sender_buffer_size_used_by_planner. If this is set to 16K (data_stream_sender_buffer_size_ default) then the estimation will work similarly to BE. Tested that this can improve both under and overestimations: peak mem / mem estimate of the first sender: select distinct * from tpch_parquet.lineitem limit 100000 default: 284.04 KB 2.75 MB --data_stream_sender_buffer_size_used_by_planner=16384: 282.04 KB 283.39 KB select distinct l_comment from tpch_parquet.lineitem limit 100000; default: 747.71 KB 509.94 KB --data_stream_sender_buffer_size_used_by_planner=16384: 740.71 KB 627.46 KB The default is not changed to avoid side effects. I would like to change it once BE's handling of var len data is fixed, which is a prerequisity to use mem reservation in KrpcDataStreamSender. Change-Id: I1e4b1db030be934cece565e3f2634ee7cbdb7c4f Reviewed-on: http://gerrit.cloudera.org:8080/21797 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/util/backend-gflag-util.cc | 7 ++++++ common/thrift/BackendGflags.thrift | 2 ++ .../org/apache/impala/planner/DataStreamSink.java | 26 +++++++++++++++++++--- .../org/apache/impala/planner/EmptySetNode.java | 1 + .../java/org/apache/impala/planner/PlanNode.java | 7 ++++++ .../org/apache/impala/service/BackendConfig.java | 4 ++++ 6 files changed, 44 insertions(+), 3 deletions(-) diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index 5d0f11908..f58d57223 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -274,6 +274,11 @@ DEFINE_double_hidden(query_cpu_root_factor, 1.5, "(Advance) The Nth root value to control sublinear scale down of unbounded " "cpu requirement for executor group set selection."); +DEFINE_int64_hidden(data_stream_sender_buffer_size_used_by_planner, -1, + "Similar to data_stream_sender_buffer_size but used during planning." + "With default -1 the planner uses the old logic that is different" + "than how the backend actually works (see IMPALA-12594)"); + using strings::Substitute; namespace impala { @@ -490,6 +495,8 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) { cfg.__set_dbcp_max_conn_pool_size(FLAGS_dbcp_max_conn_pool_size); cfg.__set_dbcp_max_wait_millis_for_conn(FLAGS_dbcp_max_wait_millis_for_conn); cfg.__set_dbcp_data_source_idle_timeout(FLAGS_dbcp_data_source_idle_timeout_s); + cfg.__set_data_stream_sender_buffer_size_used_by_planner( + FLAGS_data_stream_sender_buffer_size_used_by_planner); #ifdef NDEBUG cfg.__set_is_release_build(true); #else diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index e0121b671..3a4d52ac6 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -310,4 +310,6 @@ struct TBackendGflags { 139: required bool enable_catalogd_ha 140: required bool iceberg_always_allow_merge_on_read_operations + + 141: required i64 data_stream_sender_buffer_size_used_by_planner } diff --git a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java index 5ea939a7c..7b280d86d 100644 --- a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java +++ b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java @@ -20,6 +20,7 @@ package org.apache.impala.planner; import java.util.List; import org.apache.impala.analysis.Expr; +import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TDataSink; import org.apache.impala.thrift.TDataSinkType; import org.apache.impala.thrift.TDataStreamSink; @@ -77,13 +78,32 @@ public class DataStreamSink extends DataSink { * equal to avgOutboundRowBatchSize. If outputPartiton_ is partitioned, all of the * channel's OutboundRowBatches are used. Otherwise, only a pair of OutboundRowBatches * in KrpcDataStreamSender class are used. - * TODO: this function can both over and under estimate the memory need - * see IMPALA-12594 + IMPALA-12433 + * + * Two different ways are used to calculate the number of rows sent in one + * OutboundRowBatch: + * If flag data_stream_sender_buffer_size_used_by_planner is set, it is used the + * way BE uses data_stream_sender_buffer_size in + * KrpcDataStreamSender::Channel::RowBatchCapacity(). + * Otherwise simply batch_size is used. + * + * TODO(IMPALA-12594 + IMPALA-12433): + * consider varlen size and max_row_size in BE and update planner */ private long estimateOutboundRowBatchBuffers(TQueryOptions queryOptions) { + long beBufferBytes = + BackendConfig.INSTANCE.dataStreamSenderBufferSizeUsedByPlanner(); + // Calculated the rows fitting to buffer without considering var len data + // similarly to KrpcDataStreamSender::Channel::RowBatchCapacity(). + long fixedLenRowSize = exchNode_.getFixedLenRowSize(); + if (fixedLenRowSize==0) fixedLenRowSize = 1; // avoid division by 0 + long beRowsPerBuffer = + Math.max(1, (long)Math.ceil(beBufferBytes/fixedLenRowSize)); + // Broadcast sended uses row_batch for buffer capacity. + boolean useBeLogic = beBufferBytes > 0 && outputPartition_.isPartitioned(); int numChannels = outputPartition_.isPartitioned() ? exchNode_.getFragment().getNumInstances() : 1; - long rowBatchSize = PlanNode.getRowBatchSize(queryOptions); + long rowBatchSize = + useBeLogic ? beRowsPerBuffer : PlanNode.getRowBatchSize(queryOptions); long avgOutboundRowBatchSize = Math.min( (long) Math.ceil(rowBatchSize * ExchangeNode.getAvgSerializedRowSize(exchNode_)), PlanNode.ROWBATCH_MAX_MEM_USAGE); diff --git a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java index 7ac86da4f..21b5f191d 100644 --- a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java +++ b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java @@ -44,6 +44,7 @@ public class EmptySetNode extends PlanNode { public void computeStats(Analyzer analyzer) { avgRowSize_ = 0; rowPadSize_ = 0; + getFixedLenRowSize_ = 0; cardinality_ = 0; numInstances_ = numNodes_ = 1; } diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java index 3f95e8b8c..16c253374 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java @@ -165,6 +165,9 @@ abstract public class PlanNode extends TreeNode<PlanNode> { // Set in computeStats(). protected float rowPadSize_; + // Row size without varlen data. + protected long getFixedLenRowSize_; + // If true, disable codegen for this plan node. protected boolean disableCodegen_; @@ -258,6 +261,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> { public float getAvgRowSizeWithoutPad() { return Math.max(0.0F, avgRowSize_ - rowPadSize_); } + public long getFixedLenRowSize() { return getFixedLenRowSize_; } + public void setFragment(PlanFragment fragment) { fragment_ = fragment; } public PlanFragment getFragment() { return fragment_; } public List<Expr> getConjuncts() { return conjuncts_; } @@ -719,10 +724,12 @@ abstract public class PlanNode extends TreeNode<PlanNode> { public void computeStats(Analyzer analyzer) { avgRowSize_ = 0.0F; rowPadSize_ = 0.0F; + getFixedLenRowSize_ = 0; for (TupleId tid: tupleIds_) { TupleDescriptor desc = analyzer.getTupleDesc(tid); avgRowSize_ += desc.getAvgSerializedSize(); rowPadSize_ += desc.getSerializedPadSize(); + getFixedLenRowSize_ += desc.getByteSize(); } if (!children_.isEmpty()) { numNodes_ = getChild(0).numNodes_; diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java index 29eeaf3e4..e6d26e70a 100644 --- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java +++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java @@ -444,6 +444,10 @@ public class BackendConfig { return backendCfg_.iceberg_always_allow_merge_on_read_operations; } + public long dataStreamSenderBufferSizeUsedByPlanner() { + return backendCfg_.data_stream_sender_buffer_size_used_by_planner; + } + public boolean isJsonScannerEnabled() { return backendCfg_.enable_json_scanner; }
