This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c7ce233679917761c4435852f049ac4b1d05ccce
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Fri Sep 13 16:44:57 2024 +0200

    IMPALA-12594: Add flag to tune KrpcDataStreamSender mem estimate
    
    The way the planner estimates mem usage for KrpcDataStreamSender
    is very different than how the backend actually uses memory -
    the planner assumes that batch_size number of rows are sent at
    a time while the BE tries to limit it to
    data_stream_sender_buffer_size_ (but doesn't consider var len data).
    The Jira has more detail about differences and issues.
    
    This change adds flag data_stream_sender_buffer_size_used_by_planner.
    If this is set to 16K (data_stream_sender_buffer_size_ default)
    then the estimation will work similarly to BE.
    
    Tested that this can improve both under and overestimations:
    peak mem / mem estimate of the first sender:
    
    select distinct * from tpch_parquet.lineitem limit 100000
    default:
    284.04 KB        2.75 MB
    --data_stream_sender_buffer_size_used_by_planner=16384:
    282.04 KB      283.39 KB
    
    select distinct l_comment from tpch_parquet.lineitem limit 100000;
    default:
    747.71 KB      509.94 KB
    --data_stream_sender_buffer_size_used_by_planner=16384:
    740.71 KB      627.46 KB
    
    The default is not changed to avoid side effects. I would like
    to change it once BE's handling of var len data is fixed, which
    is a prerequisity to use mem reservation in KrpcDataStreamSender.
    
    Change-Id: I1e4b1db030be934cece565e3f2634ee7cbdb7c4f
    Reviewed-on: http://gerrit.cloudera.org:8080/21797
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/util/backend-gflag-util.cc                  |  7 ++++++
 common/thrift/BackendGflags.thrift                 |  2 ++
 .../org/apache/impala/planner/DataStreamSink.java  | 26 +++++++++++++++++++---
 .../org/apache/impala/planner/EmptySetNode.java    |  1 +
 .../java/org/apache/impala/planner/PlanNode.java   |  7 ++++++
 .../org/apache/impala/service/BackendConfig.java   |  4 ++++
 6 files changed, 44 insertions(+), 3 deletions(-)

diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index 5d0f11908..f58d57223 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -274,6 +274,11 @@ DEFINE_double_hidden(query_cpu_root_factor, 1.5,
     "(Advance) The Nth root value to control sublinear scale down of unbounded 
"
     "cpu requirement for executor group set selection.");
 
+DEFINE_int64_hidden(data_stream_sender_buffer_size_used_by_planner, -1,
+    "Similar to data_stream_sender_buffer_size but used during planning."
+    "With default -1 the planner uses the old logic that is different"
+    "than how the backend actually works (see IMPALA-12594)");
+
 using strings::Substitute;
 
 namespace impala {
@@ -490,6 +495,8 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_dbcp_max_conn_pool_size(FLAGS_dbcp_max_conn_pool_size);
   cfg.__set_dbcp_max_wait_millis_for_conn(FLAGS_dbcp_max_wait_millis_for_conn);
   
cfg.__set_dbcp_data_source_idle_timeout(FLAGS_dbcp_data_source_idle_timeout_s);
+  cfg.__set_data_stream_sender_buffer_size_used_by_planner(
+      FLAGS_data_stream_sender_buffer_size_used_by_planner);
 #ifdef NDEBUG
   cfg.__set_is_release_build(true);
 #else
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index e0121b671..3a4d52ac6 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -310,4 +310,6 @@ struct TBackendGflags {
   139: required bool enable_catalogd_ha
 
   140: required bool iceberg_always_allow_merge_on_read_operations
+
+  141: required i64 data_stream_sender_buffer_size_used_by_planner
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java 
b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
index 5ea939a7c..7b280d86d 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
@@ -20,6 +20,7 @@ package org.apache.impala.planner;
 import java.util.List;
 
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TDataStreamSink;
@@ -77,13 +78,32 @@ public class DataStreamSink extends DataSink {
    * equal to avgOutboundRowBatchSize. If outputPartiton_ is partitioned, all 
of the
    * channel's OutboundRowBatches are used. Otherwise, only a pair of 
OutboundRowBatches
    * in KrpcDataStreamSender class are used.
-   * TODO: this function can both over and under estimate the memory need
-   *       see IMPALA-12594 + IMPALA-12433
+   *
+   * Two different ways are used to calculate the number of rows sent in one
+   * OutboundRowBatch:
+   * If flag data_stream_sender_buffer_size_used_by_planner is set, it is used 
the
+   * way BE uses data_stream_sender_buffer_size in
+   * KrpcDataStreamSender::Channel::RowBatchCapacity().
+   * Otherwise simply batch_size is used.
+   *
+   * TODO(IMPALA-12594 + IMPALA-12433):
+   *    consider varlen size and max_row_size in BE and update planner
    */
   private long estimateOutboundRowBatchBuffers(TQueryOptions queryOptions) {
+    long beBufferBytes =
+        BackendConfig.INSTANCE.dataStreamSenderBufferSizeUsedByPlanner();
+    // Calculated the rows fitting to buffer without considering var len data
+    // similarly to KrpcDataStreamSender::Channel::RowBatchCapacity().
+    long fixedLenRowSize = exchNode_.getFixedLenRowSize();
+    if (fixedLenRowSize==0) fixedLenRowSize = 1; // avoid division by 0
+    long beRowsPerBuffer =
+        Math.max(1, (long)Math.ceil(beBufferBytes/fixedLenRowSize));
+    // Broadcast sended uses row_batch for buffer capacity.
+    boolean useBeLogic  = beBufferBytes > 0 && 
outputPartition_.isPartitioned();
     int numChannels =
         outputPartition_.isPartitioned() ? 
exchNode_.getFragment().getNumInstances() : 1;
-    long rowBatchSize = PlanNode.getRowBatchSize(queryOptions);
+    long rowBatchSize =
+        useBeLogic ?  beRowsPerBuffer : PlanNode.getRowBatchSize(queryOptions);
     long avgOutboundRowBatchSize = Math.min(
         (long) Math.ceil(rowBatchSize * 
ExchangeNode.getAvgSerializedRowSize(exchNode_)),
         PlanNode.ROWBATCH_MAX_MEM_USAGE);
diff --git a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java 
b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
index 7ac86da4f..21b5f191d 100644
--- a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
@@ -44,6 +44,7 @@ public class EmptySetNode extends PlanNode {
   public void computeStats(Analyzer analyzer) {
     avgRowSize_ = 0;
     rowPadSize_ = 0;
+    getFixedLenRowSize_ = 0;
     cardinality_ = 0;
     numInstances_ = numNodes_ = 1;
   }
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java 
b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 3f95e8b8c..16c253374 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -165,6 +165,9 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   // Set in computeStats().
   protected float rowPadSize_;
 
+  // Row size without varlen data.
+  protected long getFixedLenRowSize_;
+
   // If true, disable codegen for this plan node.
   protected boolean disableCodegen_;
 
@@ -258,6 +261,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   public float getAvgRowSizeWithoutPad() {
     return Math.max(0.0F, avgRowSize_ - rowPadSize_);
   }
+  public long getFixedLenRowSize() { return getFixedLenRowSize_; }
+
   public void setFragment(PlanFragment fragment) { fragment_ = fragment; }
   public PlanFragment getFragment() { return fragment_; }
   public List<Expr> getConjuncts() { return conjuncts_; }
@@ -719,10 +724,12 @@ abstract public class PlanNode extends TreeNode<PlanNode> 
{
   public void computeStats(Analyzer analyzer) {
     avgRowSize_ = 0.0F;
     rowPadSize_ = 0.0F;
+    getFixedLenRowSize_ = 0;
     for (TupleId tid: tupleIds_) {
       TupleDescriptor desc = analyzer.getTupleDesc(tid);
       avgRowSize_ += desc.getAvgSerializedSize();
       rowPadSize_ += desc.getSerializedPadSize();
+      getFixedLenRowSize_ += desc.getByteSize();
     }
     if (!children_.isEmpty()) {
       numNodes_ = getChild(0).numNodes_;
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 29eeaf3e4..e6d26e70a 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -444,6 +444,10 @@ public class BackendConfig {
     return backendCfg_.iceberg_always_allow_merge_on_read_operations;
   }
 
+  public long dataStreamSenderBufferSizeUsedByPlanner() {
+    return backendCfg_.data_stream_sender_buffer_size_used_by_planner;
+  }
+
   public boolean isJsonScannerEnabled() {
     return backendCfg_.enable_json_scanner;
   }

Reply via email to